]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/tdb1_tdb.c
tdb2: add stats to tdb1 backend.
[ccan] / ccan / tdb2 / tdb1_tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb1_private.h"
29 #include <assert.h>
30
31 /*
32   non-blocking increment of the tdb sequence number if the tdb has been opened using
33   the TDB_SEQNUM flag
34 */
35 void tdb1_increment_seqnum_nonblock(struct tdb_context *tdb)
36 {
37         tdb1_off_t seqnum=0;
38
39         if (!(tdb->flags & TDB_SEQNUM)) {
40                 return;
41         }
42
43         /* we ignore errors from this, as we have no sane way of
44            dealing with them.
45         */
46         tdb1_ofs_read(tdb, TDB1_SEQNUM_OFS, &seqnum);
47         seqnum++;
48         tdb1_ofs_write(tdb, TDB1_SEQNUM_OFS, &seqnum);
49 }
50
51 /*
52   increment the tdb sequence number if the tdb has been opened using
53   the TDB_SEQNUM flag
54 */
55 static void tdb1_increment_seqnum(struct tdb_context *tdb)
56 {
57         if (!(tdb->flags & TDB_SEQNUM)) {
58                 return;
59         }
60
61         if (tdb1_nest_lock(tdb, TDB1_SEQNUM_OFS, F_WRLCK,
62                            TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
63                 return;
64         }
65
66         tdb1_increment_seqnum_nonblock(tdb);
67
68         tdb1_nest_unlock(tdb, TDB1_SEQNUM_OFS, F_WRLCK);
69 }
70
71 static int tdb1_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
72 {
73         return memcmp(data.dptr, key.dptr, data.dsize);
74 }
75
76 /* Returns 0 on fail.  On success, return offset of record, and fills
77    in rec */
78 static tdb1_off_t tdb1_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
79                         struct tdb1_record *r)
80 {
81         tdb1_off_t rec_ptr;
82
83         /* read in the hash top */
84         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
85                 return 0;
86
87         /* keep looking until we find the right record */
88         while (rec_ptr) {
89                 if (tdb1_rec_read(tdb, rec_ptr, r) == -1)
90                         return 0;
91
92                 tdb->stats.compares++;
93                 if (TDB1_DEAD(r)) {
94                         tdb->stats.compare_wrong_bucket++;
95                 } else if (key.dsize != r->key_len) {
96                         tdb->stats.compare_wrong_keylen++;
97                 } else if (hash != r->full_hash) {
98                         tdb->stats.compare_wrong_rechash++;
99                 } else if (tdb1_parse_data(tdb, key, rec_ptr + sizeof(*r),
100                                            r->key_len, tdb1_key_compare,
101                                            NULL) != 0) {
102                         tdb->stats.compare_wrong_keycmp++;
103                 } else {
104                         return rec_ptr;
105                 }
106                 /* detect tight infinite loop */
107                 if (rec_ptr == r->next) {
108                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT,
109                                                 TDB_LOG_ERROR,
110                                                 "tdb1_find: loop detected.");
111                         return 0;
112                 }
113                 rec_ptr = r->next;
114         }
115         tdb->last_error = TDB_ERR_NOEXIST;
116         return 0;
117 }
118
119 /* As tdb1_find, but if you succeed, keep the lock */
120 tdb1_off_t tdb1_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
121                            struct tdb1_record *rec)
122 {
123         uint32_t rec_ptr;
124
125         if (tdb1_lock(tdb, TDB1_BUCKET(hash), locktype) == -1)
126                 return 0;
127         if (!(rec_ptr = tdb1_find(tdb, key, hash, rec)))
128                 tdb1_unlock(tdb, TDB1_BUCKET(hash), locktype);
129         return rec_ptr;
130 }
131
132 static TDB_DATA _tdb1_fetch(struct tdb_context *tdb, TDB_DATA key);
133
134 /* update an entry in place - this only works if the new data size
135    is <= the old data size and the key exists.
136    on failure return -1.
137 */
138 static int tdb1_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
139 {
140         struct tdb1_record rec;
141         tdb1_off_t rec_ptr;
142
143         /* find entry */
144         if (!(rec_ptr = tdb1_find(tdb, key, hash, &rec)))
145                 return -1;
146
147         /* it could be an exact duplicate of what is there - this is
148          * surprisingly common (eg. with a ldb re-index). */
149         if (rec.key_len == key.dsize &&
150             rec.data_len == dbuf.dsize &&
151             rec.full_hash == hash) {
152                 TDB_DATA data = _tdb1_fetch(tdb, key);
153                 if (data.dsize == dbuf.dsize &&
154                     memcmp(data.dptr, dbuf.dptr, data.dsize) == 0) {
155                         if (data.dptr) {
156                                 free(data.dptr);
157                         }
158                         return 0;
159                 }
160                 if (data.dptr) {
161                         free(data.dptr);
162                 }
163         }
164
165         /* must be long enough key, data and tailer */
166         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb1_off_t)) {
167                 tdb->last_error = TDB_SUCCESS; /* Not really an error */
168                 return -1;
169         }
170
171         if (tdb->tdb1.io->tdb1_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
172                       dbuf.dptr, dbuf.dsize) == -1)
173                 return -1;
174
175         if (dbuf.dsize != rec.data_len) {
176                 /* update size */
177                 rec.data_len = dbuf.dsize;
178                 return tdb1_rec_write(tdb, rec_ptr, &rec);
179         }
180
181         return 0;
182 }
183
184 /* find an entry in the database given a key */
185 /* If an entry doesn't exist tdb1_err will be set to
186  * TDB_ERR_NOEXIST. If a key has no data attached
187  * then the TDB_DATA will have zero length but
188  * a non-zero pointer
189  */
190 static TDB_DATA _tdb1_fetch(struct tdb_context *tdb, TDB_DATA key)
191 {
192         tdb1_off_t rec_ptr;
193         struct tdb1_record rec;
194         TDB_DATA ret;
195         uint32_t hash;
196
197         /* find which hash bucket it is in */
198         hash = tdb_hash(tdb, key.dptr, key.dsize);
199         if (!(rec_ptr = tdb1_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
200                 ret.dptr = NULL;
201                 ret.dsize = 0;
202                 return ret;
203         }
204
205         ret.dptr = tdb1_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
206                                   rec.data_len);
207         ret.dsize = rec.data_len;
208         tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_RDLCK);
209         return ret;
210 }
211
212 enum TDB_ERROR tdb1_fetch(struct tdb_context *tdb, TDB_DATA key, TDB_DATA *data)
213 {
214         *data = _tdb1_fetch(tdb, key);
215         if (data->dptr == NULL)
216                 return tdb->last_error;
217         return TDB_SUCCESS;
218 }
219
220 enum TDB_ERROR tdb1_parse_record(struct tdb_context *tdb, TDB_DATA key,
221                                  enum TDB_ERROR (*parser)(TDB_DATA key,
222                                                           TDB_DATA data,
223                                                           void *private_data),
224                                  void *private_data)
225 {
226         tdb1_off_t rec_ptr;
227         struct tdb1_record rec;
228         enum TDB_ERROR ret;
229         uint32_t hash;
230
231         /* find which hash bucket it is in */
232         hash = tdb_hash(tdb, key.dptr, key.dsize);
233
234         if (!(rec_ptr = tdb1_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
235                 /* record not found */
236                 return TDB_ERR_NOEXIST;
237         }
238
239         ret = tdb1_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
240                              rec.data_len, parser, private_data);
241
242         tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_RDLCK);
243
244         return ret;
245 }
246
247 /* check if an entry in the database exists
248
249    note that 1 is returned if the key is found and 0 is returned if not found
250    this doesn't match the conventions in the rest of this module, but is
251    compatible with gdbm
252 */
253 static int tdb1_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
254 {
255         struct tdb1_record rec;
256
257         if (tdb1_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
258                 return 0;
259         tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_RDLCK);
260         return 1;
261 }
262
263 int tdb1_exists(struct tdb_context *tdb, TDB_DATA key)
264 {
265         uint32_t hash = tdb_hash(tdb, key.dptr, key.dsize);
266         int ret;
267
268         assert(tdb->flags & TDB_VERSION1);
269         ret = tdb1_exists_hash(tdb, key, hash);
270         return ret;
271 }
272
273 /* actually delete an entry in the database given the offset */
274 int tdb1_do_delete(struct tdb_context *tdb, tdb1_off_t rec_ptr, struct tdb1_record *rec)
275 {
276         tdb1_off_t last_ptr, i;
277         struct tdb1_record lastrec;
278
279         if ((tdb->flags & TDB_RDONLY) || tdb->tdb1.traverse_read) return -1;
280
281         if (((tdb->tdb1.traverse_write != 0) && (!TDB1_DEAD(rec))) ||
282             tdb1_write_lock_record(tdb, rec_ptr) == -1) {
283                 /* Someone traversing here: mark it as dead */
284                 rec->magic = TDB1_DEAD_MAGIC;
285                 return tdb1_rec_write(tdb, rec_ptr, rec);
286         }
287         if (tdb1_write_unlock_record(tdb, rec_ptr) != 0)
288                 return -1;
289
290         /* find previous record in hash chain */
291         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(rec->full_hash), &i) == -1)
292                 return -1;
293         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
294                 if (tdb1_rec_read(tdb, i, &lastrec) == -1)
295                         return -1;
296
297         /* unlink it: next ptr is at start of record. */
298         if (last_ptr == 0)
299                 last_ptr = TDB1_HASH_TOP(rec->full_hash);
300         if (tdb1_ofs_write(tdb, last_ptr, &rec->next) == -1)
301                 return -1;
302
303         /* recover the space */
304         if (tdb1_free(tdb, rec_ptr, rec) == -1)
305                 return -1;
306         return 0;
307 }
308
309 static int tdb1_count_dead(struct tdb_context *tdb, uint32_t hash)
310 {
311         int res = 0;
312         tdb1_off_t rec_ptr;
313         struct tdb1_record rec;
314
315         /* read in the hash top */
316         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
317                 return 0;
318
319         while (rec_ptr) {
320                 if (tdb1_rec_read(tdb, rec_ptr, &rec) == -1)
321                         return 0;
322
323                 if (rec.magic == TDB1_DEAD_MAGIC) {
324                         res += 1;
325                 }
326                 rec_ptr = rec.next;
327         }
328         return res;
329 }
330
331 /*
332  * Purge all DEAD records from a hash chain
333  */
334 static int tdb1_purge_dead(struct tdb_context *tdb, uint32_t hash)
335 {
336         int res = -1;
337         struct tdb1_record rec;
338         tdb1_off_t rec_ptr;
339
340         if (tdb1_lock(tdb, -1, F_WRLCK) == -1) {
341                 return -1;
342         }
343
344         /* read in the hash top */
345         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
346                 goto fail;
347
348         while (rec_ptr) {
349                 tdb1_off_t next;
350
351                 if (tdb1_rec_read(tdb, rec_ptr, &rec) == -1) {
352                         goto fail;
353                 }
354
355                 next = rec.next;
356
357                 if (rec.magic == TDB1_DEAD_MAGIC
358                     && tdb1_do_delete(tdb, rec_ptr, &rec) == -1) {
359                         goto fail;
360                 }
361                 rec_ptr = next;
362         }
363         res = 0;
364  fail:
365         tdb1_unlock(tdb, -1, F_WRLCK);
366         return res;
367 }
368
369 /* delete an entry in the database given a key */
370 static int tdb1_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
371 {
372         tdb1_off_t rec_ptr;
373         struct tdb1_record rec;
374         int ret;
375
376         if (tdb->tdb1.max_dead_records != 0) {
377
378                 /*
379                  * Allow for some dead records per hash chain, mainly for
380                  * tdb's with a very high create/delete rate like locking.tdb.
381                  */
382
383                 if (tdb1_lock(tdb, TDB1_BUCKET(hash), F_WRLCK) == -1)
384                         return -1;
385
386                 if (tdb1_count_dead(tdb, hash) >= tdb->tdb1.max_dead_records) {
387                         /*
388                          * Don't let the per-chain freelist grow too large,
389                          * delete all existing dead records
390                          */
391                         tdb1_purge_dead(tdb, hash);
392                 }
393
394                 if (!(rec_ptr = tdb1_find(tdb, key, hash, &rec))) {
395                         tdb1_unlock(tdb, TDB1_BUCKET(hash), F_WRLCK);
396                         return -1;
397                 }
398
399                 /*
400                  * Just mark the record as dead.
401                  */
402                 rec.magic = TDB1_DEAD_MAGIC;
403                 ret = tdb1_rec_write(tdb, rec_ptr, &rec);
404         }
405         else {
406                 if (!(rec_ptr = tdb1_find_lock_hash(tdb, key, hash, F_WRLCK,
407                                                    &rec)))
408                         return -1;
409
410                 ret = tdb1_do_delete(tdb, rec_ptr, &rec);
411         }
412
413         if (ret == 0) {
414                 tdb1_increment_seqnum(tdb);
415         }
416
417         if (tdb1_unlock(tdb, TDB1_BUCKET(rec.full_hash), F_WRLCK) != 0)
418                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
419                            "tdb1_delete: WARNING tdb1_unlock failed!");
420         return ret;
421 }
422
423 int tdb1_delete(struct tdb_context *tdb, TDB_DATA key)
424 {
425         uint32_t hash = tdb_hash(tdb, key.dptr, key.dsize);
426         int ret;
427
428         assert(tdb->flags & TDB_VERSION1);
429         ret = tdb1_delete_hash(tdb, key, hash);
430         return ret;
431 }
432
433 /*
434  * See if we have a dead record around with enough space
435  */
436 static tdb1_off_t tdb1_find_dead(struct tdb_context *tdb, uint32_t hash,
437                                struct tdb1_record *r, tdb1_len_t length)
438 {
439         tdb1_off_t rec_ptr;
440
441         /* read in the hash top */
442         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1)
443                 return 0;
444
445         /* keep looking until we find the right record */
446         while (rec_ptr) {
447                 if (tdb1_rec_read(tdb, rec_ptr, r) == -1)
448                         return 0;
449
450                 if (TDB1_DEAD(r) && r->rec_len >= length) {
451                         /*
452                          * First fit for simple coding, TODO: change to best
453                          * fit
454                          */
455                         return rec_ptr;
456                 }
457                 rec_ptr = r->next;
458         }
459         return 0;
460 }
461
462 static int _tdb1_store(struct tdb_context *tdb, TDB_DATA key,
463                        TDB_DATA dbuf, int flag, uint32_t hash)
464 {
465         struct tdb1_record rec;
466         tdb1_off_t rec_ptr;
467         char *p = NULL;
468         int ret = -1;
469
470         /* check for it existing, on insert. */
471         if (flag == TDB_INSERT) {
472                 if (tdb1_exists_hash(tdb, key, hash)) {
473                         tdb->last_error = TDB_ERR_EXISTS;
474                         goto fail;
475                 }
476         } else {
477                 /* first try in-place update, on modify or replace. */
478                 if (tdb1_update_hash(tdb, key, hash, dbuf) == 0) {
479                         goto done;
480                 }
481                 if (tdb->last_error == TDB_ERR_NOEXIST &&
482                     flag == TDB_MODIFY) {
483                         /* if the record doesn't exist and we are in TDB1_MODIFY mode then
484                          we should fail the store */
485                         goto fail;
486                 }
487         }
488         /* reset the error code potentially set by the tdb1_update() */
489         tdb->last_error = TDB_SUCCESS;
490
491         /* delete any existing record - if it doesn't exist we don't
492            care.  Doing this first reduces fragmentation, and avoids
493            coalescing with `allocated' block before it's updated. */
494         if (flag != TDB_INSERT)
495                 tdb1_delete_hash(tdb, key, hash);
496
497         /* Copy key+value *before* allocating free space in case malloc
498            fails and we are left with a dead spot in the tdb. */
499
500         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
501                 tdb->last_error = TDB_ERR_OOM;
502                 goto fail;
503         }
504
505         memcpy(p, key.dptr, key.dsize);
506         if (dbuf.dsize)
507                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
508
509         if (tdb->tdb1.max_dead_records != 0) {
510                 /*
511                  * Allow for some dead records per hash chain, look if we can
512                  * find one that can hold the new record. We need enough space
513                  * for key, data and tailer. If we find one, we don't have to
514                  * consult the central freelist.
515                  */
516                 rec_ptr = tdb1_find_dead(
517                         tdb, hash, &rec,
518                         key.dsize + dbuf.dsize + sizeof(tdb1_off_t));
519
520                 if (rec_ptr != 0) {
521                         rec.key_len = key.dsize;
522                         rec.data_len = dbuf.dsize;
523                         rec.full_hash = hash;
524                         rec.magic = TDB1_MAGIC;
525                         if (tdb1_rec_write(tdb, rec_ptr, &rec) == -1
526                             || tdb->tdb1.io->tdb1_write(
527                                     tdb, rec_ptr + sizeof(rec),
528                                     p, key.dsize + dbuf.dsize) == -1) {
529                                 goto fail;
530                         }
531                         goto done;
532                 }
533         }
534
535         /*
536          * We have to allocate some space from the freelist, so this means we
537          * have to lock it. Use the chance to purge all the DEAD records from
538          * the hash chain under the freelist lock.
539          */
540
541         if (tdb1_lock(tdb, -1, F_WRLCK) == -1) {
542                 goto fail;
543         }
544
545         if ((tdb->tdb1.max_dead_records != 0)
546             && (tdb1_purge_dead(tdb, hash) == -1)) {
547                 tdb1_unlock(tdb, -1, F_WRLCK);
548                 goto fail;
549         }
550
551         /* we have to allocate some space */
552         rec_ptr = tdb1_allocate(tdb, key.dsize + dbuf.dsize, &rec);
553
554         tdb1_unlock(tdb, -1, F_WRLCK);
555
556         if (rec_ptr == 0) {
557                 goto fail;
558         }
559
560         /* Read hash top into next ptr */
561         if (tdb1_ofs_read(tdb, TDB1_HASH_TOP(hash), &rec.next) == -1)
562                 goto fail;
563
564         rec.key_len = key.dsize;
565         rec.data_len = dbuf.dsize;
566         rec.full_hash = hash;
567         rec.magic = TDB1_MAGIC;
568
569         /* write out and point the top of the hash chain at it */
570         if (tdb1_rec_write(tdb, rec_ptr, &rec) == -1
571             || tdb->tdb1.io->tdb1_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
572             || tdb1_ofs_write(tdb, TDB1_HASH_TOP(hash), &rec_ptr) == -1) {
573                 /* Need to tdb1_unallocate() here */
574                 goto fail;
575         }
576
577  done:
578         ret = 0;
579  fail:
580         if (ret == 0) {
581                 tdb1_increment_seqnum(tdb);
582         }
583
584         SAFE_FREE(p);
585         return ret;
586 }
587
588 /* store an element in the database, replacing any existing element
589    with the same key
590
591    return 0 on success, -1 on failure
592 */
593 int tdb1_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
594 {
595         uint32_t hash;
596         int ret;
597
598         assert(tdb->flags & TDB_VERSION1);
599
600         if ((tdb->flags & TDB_RDONLY) || tdb->tdb1.traverse_read) {
601                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_RDONLY,
602                                              TDB_LOG_USE_ERROR,
603                                              "tdb_store: read-only tdb");
604                 return -1;
605         }
606
607         /* find which hash bucket it is in */
608         hash = tdb_hash(tdb, key.dptr, key.dsize);
609         if (tdb1_lock(tdb, TDB1_BUCKET(hash), F_WRLCK) == -1)
610                 return -1;
611
612         ret = _tdb1_store(tdb, key, dbuf, flag, hash);
613         tdb1_unlock(tdb, TDB1_BUCKET(hash), F_WRLCK);
614         return ret;
615 }
616
617 /* Append to an entry. Create if not exist. */
618 int tdb1_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
619 {
620         uint32_t hash;
621         TDB_DATA dbuf;
622         int ret = -1;
623
624         assert(tdb->flags & TDB_VERSION1);
625
626         /* find which hash bucket it is in */
627         hash = tdb_hash(tdb, key.dptr, key.dsize);
628         if (tdb1_lock(tdb, TDB1_BUCKET(hash), F_WRLCK) == -1)
629                 return -1;
630
631         dbuf = _tdb1_fetch(tdb, key);
632
633         if (dbuf.dptr == NULL) {
634                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
635         } else {
636                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
637                 unsigned char *new_dptr;
638
639                 /* realloc '0' is special: don't do that. */
640                 if (new_len == 0)
641                         new_len = 1;
642                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
643                 if (new_dptr == NULL) {
644                         free(dbuf.dptr);
645                 }
646                 dbuf.dptr = new_dptr;
647         }
648
649         if (dbuf.dptr == NULL) {
650                 tdb->last_error = TDB_ERR_OOM;
651                 goto failed;
652         }
653
654         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
655         dbuf.dsize += new_dbuf.dsize;
656
657         ret = _tdb1_store(tdb, key, dbuf, 0, hash);
658
659 failed:
660         tdb1_unlock(tdb, TDB1_BUCKET(hash), F_WRLCK);
661         SAFE_FREE(dbuf.dptr);
662         return ret;
663 }
664
665
666 /*
667   get the tdb sequence number. Only makes sense if the writers opened
668   with TDB1_SEQNUM set. Note that this sequence number will wrap quite
669   quickly, so it should only be used for a 'has something changed'
670   test, not for code that relies on the count of the number of changes
671   made. If you want a counter then use a tdb record.
672
673   The aim of this sequence number is to allow for a very lightweight
674   test of a possible tdb change.
675 */
676 int tdb1_get_seqnum(struct tdb_context *tdb)
677 {
678         tdb1_off_t seqnum=0;
679
680         tdb1_ofs_read(tdb, TDB1_SEQNUM_OFS, &seqnum);
681         return seqnum;
682 }
683
684
685 /*
686   add a region of the file to the freelist. Length is the size of the region in bytes,
687   which includes the free list header that needs to be added
688  */
689 static int tdb1_free_region(struct tdb_context *tdb, tdb1_off_t offset, ssize_t length)
690 {
691         struct tdb1_record rec;
692         if (length <= sizeof(rec)) {
693                 /* the region is not worth adding */
694                 return 0;
695         }
696         if (length + offset > tdb->file->map_size) {
697                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
698                                         "tdb1_free_region: adding region beyond"
699                                         " end of file");
700                 return -1;
701         }
702         memset(&rec,'\0',sizeof(rec));
703         rec.rec_len = length - sizeof(rec);
704         if (tdb1_free(tdb, offset, &rec) == -1) {
705                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
706                            "tdb1_free_region: failed to add free record");
707                 return -1;
708         }
709         return 0;
710 }
711
712 /*
713   wipe the entire database, deleting all records. This can be done
714   very fast by using a allrecord lock. The entire data portion of the
715   file becomes a single entry in the freelist.
716
717   This code carefully steps around the recovery area, leaving it alone
718  */
719 int tdb1_wipe_all(struct tdb_context *tdb)
720 {
721         int i;
722         tdb1_off_t offset = 0;
723         ssize_t data_len;
724         tdb1_off_t recovery_head;
725         tdb1_len_t recovery_size = 0;
726
727         if (tdb_lockall(tdb) != TDB_SUCCESS) {
728                 return -1;
729         }
730
731
732         /* see if the tdb has a recovery area, and remember its size
733            if so. We don't want to lose this as otherwise each
734            tdb1_wipe_all() in a transaction will increase the size of
735            the tdb by the size of the recovery area */
736         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
737                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
738                            "tdb1_wipe_all: failed to read recovery head");
739                 goto failed;
740         }
741
742         if (recovery_head != 0) {
743                 struct tdb1_record rec;
744                 if (tdb->tdb1.io->tdb1_read(tdb, recovery_head, &rec, sizeof(rec), TDB1_DOCONV()) == -1) {
745                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
746                                    "tdb1_wipe_all: failed to read recovery record");
747                         return -1;
748                 }
749                 recovery_size = rec.rec_len + sizeof(rec);
750         }
751
752         /* wipe the hashes */
753         for (i=0;i<tdb->tdb1.header.hash_size;i++) {
754                 if (tdb1_ofs_write(tdb, TDB1_HASH_TOP(i), &offset) == -1) {
755                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
756                                    "tdb1_wipe_all: failed to write hash %d", i);
757                         goto failed;
758                 }
759         }
760
761         /* wipe the freelist */
762         if (tdb1_ofs_write(tdb, TDB1_FREELIST_TOP, &offset) == -1) {
763                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
764                            "tdb1_wipe_all: failed to write freelist");
765                 goto failed;
766         }
767
768         /* add all the rest of the file to the freelist, possibly leaving a gap
769            for the recovery area */
770         if (recovery_size == 0) {
771                 /* the simple case - the whole file can be used as a freelist */
772                 data_len = (tdb->file->map_size - TDB1_DATA_START(tdb->tdb1.header.hash_size));
773                 if (tdb1_free_region(tdb, TDB1_DATA_START(tdb->tdb1.header.hash_size), data_len) != 0) {
774                         goto failed;
775                 }
776         } else {
777                 /* we need to add two freelist entries - one on either
778                    side of the recovery area
779
780                    Note that we cannot shift the recovery area during
781                    this operation. Only the transaction.c code may
782                    move the recovery area or we risk subtle data
783                    corruption
784                 */
785                 data_len = (recovery_head - TDB1_DATA_START(tdb->tdb1.header.hash_size));
786                 if (tdb1_free_region(tdb, TDB1_DATA_START(tdb->tdb1.header.hash_size), data_len) != 0) {
787                         goto failed;
788                 }
789                 /* and the 2nd free list entry after the recovery area - if any */
790                 data_len = tdb->file->map_size - (recovery_head+recovery_size);
791                 if (tdb1_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
792                         goto failed;
793                 }
794         }
795
796         tdb1_increment_seqnum_nonblock(tdb);
797         tdb_unlockall(tdb);
798         return 0;
799
800 failed:
801         tdb_unlockall(tdb);
802         return -1;
803 }
804
805 /* Even on files, we can get partial writes due to signals. */
806 bool tdb1_write_all(int fd, const void *buf, size_t count)
807 {
808         while (count) {
809                 ssize_t ret;
810                 ret = write(fd, buf, count);
811                 if (ret < 0)
812                         return false;
813                 buf = (const char *)buf + ret;
814                 count -= ret;
815         }
816         return true;
817 }