]> git.ozlabs.org Git - ccan/blob - ccan/tdb/transaction.c
Don't fail mysteriously on existing trace file.
[ccan] / ccan / tdb / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_NO_NESTING is passed to flags in tdb open then transaction
89     nesting is disabled. tdb_transaction_start() will then implicitely
90     cancel any pending transactions and always start a new transaction
91     context instead of nesting.
92
93 */
94
95
96 /*
97   hold the context of any current transaction
98 */
99 struct tdb_transaction {
100         /* we keep a mirrored copy of the tdb hash heads here so
101            tdb_next_hash_chain() can operate efficiently */
102         uint32_t *hash_heads;
103
104         /* the original io methods - used to do IOs to the real db */
105         const struct tdb_methods *io_methods;
106
107         /* the list of transaction blocks. When a block is first
108            written to, it gets created in this list */
109         uint8_t **blocks;
110         uint32_t num_blocks;
111         uint32_t block_size;      /* bytes in each block */
112         uint32_t last_block_size; /* number of valid bytes in the last block */
113
114         /* non-zero when an internal transaction error has
115            occurred. All write operations will then fail until the
116            transaction is ended */
117         int transaction_error;
118
119         /* when inside a transaction we need to keep track of any
120            nested tdb_transaction_start() calls, as these are allowed,
121            but don't create a new transaction */
122         int nesting;
123
124         /* set when a prepare has already occurred */
125         bool prepared;
126         tdb_off_t magic_offset;
127
128         /* old file size before transaction */
129         tdb_len_t old_map_size;
130 };
131
132
133 /*
134   read while in a transaction. We need to check first if the data is in our list
135   of transaction elements, then if not do a real read
136 */
137 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
138                             tdb_len_t len, int cv)
139 {
140         uint32_t blk;
141
142         /* Only a commit is allowed on a prepared transaction */
143         if (tdb->transaction->prepared) {
144                 tdb->ecode = TDB_ERR_EINVAL;
145                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: transaction already prepared, read not allowed\n"));
146                 tdb->transaction->transaction_error = 1;
147                 return -1;
148         }
149
150         /* break it down into block sized ops */
151         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
152                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
153                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
154                         return -1;
155                 }
156                 len -= len2;
157                 off += len2;
158                 buf = (void *)(len2 + (char *)buf);
159         }
160
161         if (len == 0) {
162                 return 0;
163         }
164
165         blk = off / tdb->transaction->block_size;
166
167         /* see if we have it in the block list */
168         if (tdb->transaction->num_blocks <= blk ||
169             tdb->transaction->blocks[blk] == NULL) {
170                 /* nope, do a real read */
171                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
172                         goto fail;
173                 }
174                 return 0;
175         }
176
177         /* it is in the block list. Now check for the last block */
178         if (blk == tdb->transaction->num_blocks-1) {
179                 if (len > tdb->transaction->last_block_size) {
180                         goto fail;
181                 }
182         }
183         
184         /* now copy it out of this block */
185         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
186         if (cv) {
187                 tdb_convert(buf, len);
188         }
189         return 0;
190
191 fail:
192         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
193         tdb->ecode = TDB_ERR_IO;
194         tdb->transaction->transaction_error = 1;
195         return -1;
196 }
197
198
199 /*
200   write while in a transaction
201 */
202 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
203                              const void *buf, tdb_len_t len)
204 {
205         uint32_t blk;
206
207         /* Only a commit is allowed on a prepared transaction */
208         if (tdb->transaction->prepared) {
209                 tdb->ecode = TDB_ERR_EINVAL;
210                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
211                 tdb->transaction->transaction_error = 1;
212                 return -1;
213         }
214
215         /* if the write is to a hash head, then update the transaction
216            hash heads */
217         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
218             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
219                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
220                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
221         }
222
223         /* break it up into block sized chunks */
224         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
225                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
226                 if (transaction_write(tdb, off, buf, len2) != 0) {
227                         return -1;
228                 }
229                 len -= len2;
230                 off += len2;
231                 if (buf != NULL) {
232                         buf = (const void *)(len2 + (const char *)buf);
233                 }
234         }
235
236         if (len == 0) {
237                 return 0;
238         }
239
240         blk = off / tdb->transaction->block_size;
241         off = off % tdb->transaction->block_size;
242
243         if (tdb->transaction->num_blocks <= blk) {
244                 uint8_t **new_blocks;
245                 /* expand the blocks array */
246                 if (tdb->transaction->blocks == NULL) {
247                         new_blocks = (uint8_t **)malloc(
248                                 (blk+1)*sizeof(uint8_t *));
249                 } else {
250                         new_blocks = (uint8_t **)realloc(
251                                 tdb->transaction->blocks,
252                                 (blk+1)*sizeof(uint8_t *));
253                 }
254                 if (new_blocks == NULL) {
255                         tdb->ecode = TDB_ERR_OOM;
256                         goto fail;
257                 }
258                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
259                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
260                 tdb->transaction->blocks = new_blocks;
261                 tdb->transaction->num_blocks = blk+1;
262                 tdb->transaction->last_block_size = 0;
263         }
264
265         /* allocate and fill a block? */
266         if (tdb->transaction->blocks[blk] == NULL) {
267                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
268                 if (tdb->transaction->blocks[blk] == NULL) {
269                         tdb->ecode = TDB_ERR_OOM;
270                         tdb->transaction->transaction_error = 1;
271                         return -1;                      
272                 }
273                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
274                         tdb_len_t len2 = tdb->transaction->block_size;
275                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
276                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
277                         }
278                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
279                                                                    tdb->transaction->blocks[blk], 
280                                                                    len2, 0) != 0) {
281                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
282                                 tdb->ecode = TDB_ERR_IO;
283                                 goto fail;
284                         }
285                         if (blk == tdb->transaction->num_blocks-1) {
286                                 tdb->transaction->last_block_size = len2;
287                         }                       
288                 }
289         }
290         
291         /* overwrite part of an existing block */
292         if (buf == NULL) {
293                 memset(tdb->transaction->blocks[blk] + off, 0, len);
294         } else {
295                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
296         }
297         if (blk == tdb->transaction->num_blocks-1) {
298                 if (len + off > tdb->transaction->last_block_size) {
299                         tdb->transaction->last_block_size = len + off;
300                 }
301         }
302
303         return 0;
304
305 fail:
306         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
307                  (blk*tdb->transaction->block_size) + off, len));
308         tdb->transaction->transaction_error = 1;
309         return -1;
310 }
311
312
313 /*
314   write while in a transaction - this varient never expands the transaction blocks, it only
315   updates existing blocks. This means it cannot change the recovery size
316 */
317 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
318                                       const void *buf, tdb_len_t len)
319 {
320         uint32_t blk;
321
322         /* break it up into block sized chunks */
323         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
324                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
325                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
326                         return -1;
327                 }
328                 len -= len2;
329                 off += len2;
330                 if (buf != NULL) {
331                         buf = (const void *)(len2 + (const char *)buf);
332                 }
333         }
334
335         if (len == 0) {
336                 return 0;
337         }
338
339         blk = off / tdb->transaction->block_size;
340         off = off % tdb->transaction->block_size;
341
342         if (tdb->transaction->num_blocks <= blk ||
343             tdb->transaction->blocks[blk] == NULL) {
344                 return 0;
345         }
346
347         if (blk == tdb->transaction->num_blocks-1 &&
348             off + len > tdb->transaction->last_block_size) {
349                 if (off >= tdb->transaction->last_block_size) {
350                         return 0;
351                 }
352                 len = tdb->transaction->last_block_size - off;
353         }
354
355         /* overwrite part of an existing block */
356         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
357
358         return 0;
359 }
360
361
362 /*
363   accelerated hash chain head search, using the cached hash heads
364 */
365 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
366 {
367         uint32_t h = *chain;
368         for (;h < tdb->header.hash_size;h++) {
369                 /* the +1 takes account of the freelist */
370                 if (0 != tdb->transaction->hash_heads[h+1]) {
371                         break;
372                 }
373         }
374         (*chain) = h;
375 }
376
377 /*
378   out of bounds check during a transaction
379 */
380 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
381 {
382         if (len <= tdb->map_size) {
383                 return 0;
384         }
385         return TDB_ERRCODE(TDB_ERR_IO, -1);
386 }
387
388 /*
389   transaction version of tdb_expand().
390 */
391 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
392                                    tdb_off_t addition)
393 {
394         /* add a write to the transaction elements, so subsequent
395            reads see the zero data */
396         if (transaction_write(tdb, size, NULL, addition) != 0) {
397                 return -1;
398         }
399
400         return 0;
401 }
402
403 /*
404   brlock during a transaction - ignore them
405 */
406 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
407                               int rw_type, int lck_type, int probe, size_t len)
408 {
409         return 0;
410 }
411
412 static const struct tdb_methods transaction_methods = {
413         transaction_read,
414         transaction_write,
415         transaction_next_hash_chain,
416         transaction_oob,
417         transaction_expand_file,
418         transaction_brlock
419 };
420
421 /*
422   sync to disk
423 */
424 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
425 {       
426         if (tdb->flags & TDB_NOSYNC) {
427                 return 0;
428         }
429
430         if (fsync(tdb->fd) != 0) {
431                 tdb->ecode = TDB_ERR_IO;
432                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
433                 return -1;
434         }
435 #ifdef MS_SYNC
436         if (tdb->map_ptr) {
437                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
438                 if (msync(moffset + (char *)tdb->map_ptr, 
439                           length + (offset - moffset), MS_SYNC) != 0) {
440                         tdb->ecode = TDB_ERR_IO;
441                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
442                                  strerror(errno)));
443                         return -1;
444                 }
445         }
446 #endif
447         return 0;
448 }
449
450 int tdb_transaction_cancel_internal(struct tdb_context *tdb)
451 {
452         int i, ret = 0;
453
454         if (tdb->transaction == NULL) {
455                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
456                 return -1;
457         }
458
459         if (tdb->transaction->nesting != 0) {
460                 tdb->transaction->transaction_error = 1;
461                 tdb->transaction->nesting--;
462                 return 0;
463         }               
464
465         tdb->map_size = tdb->transaction->old_map_size;
466
467         /* free all the transaction blocks */
468         for (i=0;i<tdb->transaction->num_blocks;i++) {
469                 if (tdb->transaction->blocks[i] != NULL) {
470                         free(tdb->transaction->blocks[i]);
471                 }
472         }
473         SAFE_FREE(tdb->transaction->blocks);
474
475         if (tdb->transaction->magic_offset) {
476                 const struct tdb_methods *methods = tdb->transaction->io_methods;
477                 uint32_t zero = 0;
478
479                 /* remove the recovery marker */
480                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
481                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
482                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
483                         ret = -1;
484                 }
485         }
486
487         /* remove any global lock created during the transaction */
488         if (tdb->global_lock.count != 0) {
489                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
490                 tdb->global_lock.count = 0;
491         }
492
493         /* remove any locks created during the transaction */
494         if (tdb->num_locks != 0) {
495                 for (i=0;i<tdb->num_lockrecs;i++) {
496                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
497                                    F_UNLCK,F_SETLKW, 0, 1);
498                 }
499                 tdb->num_locks = 0;
500                 tdb->num_lockrecs = 0;
501                 SAFE_FREE(tdb->lockrecs);
502         }
503
504         /* restore the normal io methods */
505         tdb->methods = tdb->transaction->io_methods;
506
507         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
508         tdb_transaction_unlock(tdb);
509         SAFE_FREE(tdb->transaction->hash_heads);
510         SAFE_FREE(tdb->transaction);
511         
512         return ret;
513 }
514
515 /*
516   start a tdb transaction. No token is returned, as only a single
517   transaction is allowed to be pending per tdb_context
518 */
519 int tdb_transaction_start(struct tdb_context *tdb)
520 {
521         /* some sanity checks */
522         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
523                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
524                 tdb->ecode = TDB_ERR_EINVAL;
525                 return -1;
526         }
527
528         /* cope with nested tdb_transaction_start() calls */
529         if (tdb->transaction != NULL) {
530                 tdb_trace(tdb, "tdb_transaction_start");
531                 if (!tdb->flags & TDB_NO_NESTING) {
532                         tdb->transaction->nesting++;
533                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
534                                  tdb->transaction->nesting));
535                         return 0;
536                 } else {
537                         tdb_transaction_cancel_internal(tdb);
538                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
539                 }
540         }
541
542         if (tdb->num_locks != 0 || tdb->global_lock.count) {
543                 /* the caller must not have any locks when starting a
544                    transaction as otherwise we'll be screwed by lack
545                    of nested locks in posix */
546                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
547                 tdb->ecode = TDB_ERR_LOCK;
548                 return -1;
549         }
550
551         if (tdb->travlocks.next != NULL) {
552                 /* you cannot use transactions inside a traverse (although you can use
553                    traverse inside a transaction) as otherwise you can end up with
554                    deadlock */
555                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
556                 tdb->ecode = TDB_ERR_LOCK;
557                 return -1;
558         }
559
560         tdb->transaction = (struct tdb_transaction *)
561                 calloc(sizeof(struct tdb_transaction), 1);
562         if (tdb->transaction == NULL) {
563                 tdb->ecode = TDB_ERR_OOM;
564                 return -1;
565         }
566
567         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
568         tdb->transaction->block_size = tdb->page_size;
569
570         /* get the transaction write lock. This is a blocking lock. As
571            discussed with Volker, there are a number of ways we could
572            make this async, which we will probably do in the future */
573         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
574                 SAFE_FREE(tdb->transaction->blocks);
575                 SAFE_FREE(tdb->transaction);
576                 return -1;
577         }
578         
579         /* get a read lock from the freelist to the end of file. This
580            is upgraded to a write lock during the commit */
581         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
582                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
583                 tdb->ecode = TDB_ERR_LOCK;
584                 goto fail;
585         }
586
587         /* setup a copy of the hash table heads so the hash scan in
588            traverse can be fast */
589         tdb->transaction->hash_heads = (uint32_t *)
590                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
591         if (tdb->transaction->hash_heads == NULL) {
592                 tdb->ecode = TDB_ERR_OOM;
593                 goto fail;
594         }
595         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
596                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
597                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
598                 tdb->ecode = TDB_ERR_IO;
599                 goto fail;
600         }
601
602         /* make sure we know about any file expansions already done by
603            anyone else */
604         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
605         tdb->transaction->old_map_size = tdb->map_size;
606
607         /* finally hook the io methods, replacing them with
608            transaction specific methods */
609         tdb->transaction->io_methods = tdb->methods;
610         tdb->methods = &transaction_methods;
611
612         /* Trace at the end, so we get sequence number correct. */
613         tdb_trace(tdb, "tdb_transaction_start");
614         return 0;
615         
616 fail:
617         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
618         tdb_transaction_unlock(tdb);
619         SAFE_FREE(tdb->transaction->blocks);
620         SAFE_FREE(tdb->transaction->hash_heads);
621         SAFE_FREE(tdb->transaction);
622         return -1;
623 }
624
625
626 /*
627   cancel the current transaction
628 */
629 int tdb_transaction_cancel(struct tdb_context *tdb)
630 {       
631         tdb_trace(tdb, "tdb_transaction_cancel");
632         return tdb_transaction_cancel_internal(tdb);
633 }
634
635 /*
636   work out how much space the linearised recovery data will consume
637 */
638 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
639 {
640         tdb_len_t recovery_size = 0;
641         int i;
642
643         recovery_size = sizeof(uint32_t);
644         for (i=0;i<tdb->transaction->num_blocks;i++) {
645                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
646                         break;
647                 }
648                 if (tdb->transaction->blocks[i] == NULL) {
649                         continue;
650                 }
651                 recovery_size += 2*sizeof(tdb_off_t);
652                 if (i == tdb->transaction->num_blocks-1) {
653                         recovery_size += tdb->transaction->last_block_size;
654                 } else {
655                         recovery_size += tdb->transaction->block_size;
656                 }
657         }       
658
659         return recovery_size;
660 }
661
662 /*
663   allocate the recovery area, or use an existing recovery area if it is
664   large enough
665 */
666 static int tdb_recovery_allocate(struct tdb_context *tdb, 
667                                  tdb_len_t *recovery_size,
668                                  tdb_off_t *recovery_offset,
669                                  tdb_len_t *recovery_max_size)
670 {
671         struct list_struct rec;
672         const struct tdb_methods *methods = tdb->transaction->io_methods;
673         tdb_off_t recovery_head;
674
675         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
676                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
677                 return -1;
678         }
679
680         rec.rec_len = 0;
681
682         if (recovery_head != 0 && 
683             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
684                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
685                 return -1;
686         }
687
688         *recovery_size = tdb_recovery_size(tdb);
689
690         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
691                 /* it fits in the existing area */
692                 *recovery_max_size = rec.rec_len;
693                 *recovery_offset = recovery_head;
694                 return 0;
695         }
696
697         /* we need to free up the old recovery area, then allocate a
698            new one at the end of the file. Note that we cannot use
699            tdb_allocate() to allocate the new one as that might return
700            us an area that is being currently used (as of the start of
701            the transaction) */
702         if (recovery_head != 0) {
703                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
704                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
705                         return -1;
706                 }
707         }
708
709         /* the tdb_free() call might have increased the recovery size */
710         *recovery_size = tdb_recovery_size(tdb);
711
712         /* round up to a multiple of page size */
713         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
714         *recovery_offset = tdb->map_size;
715         recovery_head = *recovery_offset;
716
717         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
718                                      (tdb->map_size - tdb->transaction->old_map_size) +
719                                      sizeof(rec) + *recovery_max_size) == -1) {
720                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
721                 return -1;
722         }
723
724         /* remap the file (if using mmap) */
725         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
726
727         /* we have to reset the old map size so that we don't try to expand the file
728            again in the transaction commit, which would destroy the recovery area */
729         tdb->transaction->old_map_size = tdb->map_size;
730
731         /* write the recovery header offset and sync - we can sync without a race here
732            as the magic ptr in the recovery record has not been set */
733         CONVERT(recovery_head);
734         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
735                                &recovery_head, sizeof(tdb_off_t)) == -1) {
736                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
737                 return -1;
738         }
739         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
740                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
741                 return -1;
742         }
743
744         return 0;
745 }
746
747
748 /*
749   setup the recovery data that will be used on a crash during commit
750 */
751 static int transaction_setup_recovery(struct tdb_context *tdb, 
752                                       tdb_off_t *magic_offset)
753 {
754         tdb_len_t recovery_size;
755         unsigned char *data, *p;
756         const struct tdb_methods *methods = tdb->transaction->io_methods;
757         struct list_struct *rec;
758         tdb_off_t recovery_offset, recovery_max_size;
759         tdb_off_t old_map_size = tdb->transaction->old_map_size;
760         uint32_t magic, tailer;
761         int i;
762
763         /*
764           check that the recovery area has enough space
765         */
766         if (tdb_recovery_allocate(tdb, &recovery_size, 
767                                   &recovery_offset, &recovery_max_size) == -1) {
768                 return -1;
769         }
770
771         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
772         if (data == NULL) {
773                 tdb->ecode = TDB_ERR_OOM;
774                 return -1;
775         }
776
777         rec = (struct list_struct *)data;
778         memset(rec, 0, sizeof(*rec));
779
780         rec->magic    = 0;
781         rec->data_len = recovery_size;
782         rec->rec_len  = recovery_max_size;
783         rec->key_len  = old_map_size;
784         CONVERT(rec);
785
786         /* build the recovery data into a single blob to allow us to do a single
787            large write, which should be more efficient */
788         p = data + sizeof(*rec);
789         for (i=0;i<tdb->transaction->num_blocks;i++) {
790                 tdb_off_t offset;
791                 tdb_len_t length;
792
793                 if (tdb->transaction->blocks[i] == NULL) {
794                         continue;
795                 }
796
797                 offset = i * tdb->transaction->block_size;
798                 length = tdb->transaction->block_size;
799                 if (i == tdb->transaction->num_blocks-1) {
800                         length = tdb->transaction->last_block_size;
801                 }
802                 
803                 if (offset >= old_map_size) {
804                         continue;
805                 }
806                 if (offset + length > tdb->transaction->old_map_size) {
807                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
808                         free(data);
809                         tdb->ecode = TDB_ERR_CORRUPT;
810                         return -1;
811                 }
812                 memcpy(p, &offset, 4);
813                 memcpy(p+4, &length, 4);
814                 if (DOCONV()) {
815                         tdb_convert(p, 8);
816                 }
817                 /* the recovery area contains the old data, not the
818                    new data, so we have to call the original tdb_read
819                    method to get it */
820                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
821                         free(data);
822                         tdb->ecode = TDB_ERR_IO;
823                         return -1;
824                 }
825                 p += 8 + length;
826         }
827
828         /* and the tailer */
829         tailer = sizeof(*rec) + recovery_max_size;
830         memcpy(p, &tailer, 4);
831         CONVERT(p);
832
833         /* write the recovery data to the recovery area */
834         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
835                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
836                 free(data);
837                 tdb->ecode = TDB_ERR_IO;
838                 return -1;
839         }
840         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
841                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
842                 free(data);
843                 tdb->ecode = TDB_ERR_IO;
844                 return -1;
845         }
846
847         /* as we don't have ordered writes, we have to sync the recovery
848            data before we update the magic to indicate that the recovery
849            data is present */
850         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
851                 free(data);
852                 return -1;
853         }
854
855         free(data);
856
857         magic = TDB_RECOVERY_MAGIC;
858         CONVERT(magic);
859
860         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
861
862         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
863                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
864                 tdb->ecode = TDB_ERR_IO;
865                 return -1;
866         }
867         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
868                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
869                 tdb->ecode = TDB_ERR_IO;
870                 return -1;
871         }
872
873         /* ensure the recovery magic marker is on disk */
874         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
875                 return -1;
876         }
877
878         return 0;
879 }
880
881 static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
882 {       
883         const struct tdb_methods *methods;
884
885         if (tdb->transaction == NULL) {
886                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
887                 return -1;
888         }
889
890         if (tdb->transaction->prepared) {
891                 tdb->ecode = TDB_ERR_EINVAL;
892                 tdb_transaction_cancel(tdb);
893                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
894                 return -1;
895         }
896
897         if (tdb->transaction->transaction_error) {
898                 tdb->ecode = TDB_ERR_IO;
899                 tdb_transaction_cancel_internal(tdb);
900                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
901                 return -1;
902         }
903
904
905         if (tdb->transaction->nesting != 0) {
906                 tdb->transaction->nesting--;
907                 return 0;
908         }               
909
910 #ifdef TDB_TRACE
911         /* store seqnum now, before reading becomes illegal. */
912         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &tdb->transaction_prepare_seqnum);
913 #endif
914
915         /* check for a null transaction */
916         if (tdb->transaction->blocks == NULL) {
917                 return 0;
918         }
919
920         methods = tdb->transaction->io_methods;
921         
922         /* if there are any locks pending then the caller has not
923            nested their locks properly, so fail the transaction */
924         if (tdb->num_locks || tdb->global_lock.count) {
925                 tdb->ecode = TDB_ERR_LOCK;
926                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
927                 tdb_transaction_cancel_internal(tdb);
928                 return -1;
929         }
930
931         /* upgrade the main transaction lock region to a write lock */
932         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
933                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
934                 tdb->ecode = TDB_ERR_LOCK;
935                 tdb_transaction_cancel_internal(tdb);
936                 return -1;
937         }
938
939         /* get the global lock - this prevents new users attaching to the database
940            during the commit */
941         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
942                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
943                 tdb->ecode = TDB_ERR_LOCK;
944                 tdb_transaction_cancel_internal(tdb);
945                 return -1;
946         }
947
948         if (!(tdb->flags & TDB_NOSYNC)) {
949                 /* write the recovery data to the end of the file */
950                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
951                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
952                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
953                         tdb_transaction_cancel_internal(tdb);
954                         return -1;
955                 }
956         }
957
958         tdb->transaction->prepared = true;
959
960         /* expand the file to the new size if needed */
961         if (tdb->map_size != tdb->transaction->old_map_size) {
962                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
963                                              tdb->map_size - 
964                                              tdb->transaction->old_map_size) == -1) {
965                         tdb->ecode = TDB_ERR_IO;
966                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
967                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
968                         tdb_transaction_cancel_internal(tdb);
969                         return -1;
970                 }
971                 tdb->map_size = tdb->transaction->old_map_size;
972                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
973         }
974
975         /* Keep the global lock until the actual commit */
976
977         return 0;
978 }
979
980 /*
981    prepare to commit the current transaction
982 */
983 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
984 {       
985         tdb_trace(tdb, "tdb_transaction_prepare_commit");
986         return tdb_transaction_prepare_commit_internal(tdb);
987 }
988
989 /*
990   commit the current transaction
991 */
992 int tdb_transaction_commit(struct tdb_context *tdb)
993 {       
994         const struct tdb_methods *methods;
995         int i;
996
997         if (tdb->transaction == NULL) {
998                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
999                 return -1;
1000         }
1001
1002         /* If we've prepared, can't read seqnum. */
1003         if (tdb->transaction->prepared) {
1004                 tdb_trace_seqnum(tdb, tdb->transaction_prepare_seqnum,
1005                                  "tdb_transaction_commit");
1006         } else {
1007                 tdb_trace(tdb, "tdb_transaction_commit");
1008         }
1009
1010         if (tdb->transaction->transaction_error) {
1011                 tdb->ecode = TDB_ERR_IO;
1012                 tdb_transaction_cancel(tdb);
1013                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1014                 return -1;
1015         }
1016
1017
1018         if (tdb->transaction->nesting != 0) {
1019                 tdb->transaction->nesting--;
1020                 return 0;
1021         }
1022
1023         /* check for a null transaction */
1024         if (tdb->transaction->blocks == NULL) {
1025                 tdb_transaction_cancel_internal(tdb);
1026                 return 0;
1027         }
1028
1029         if (!tdb->transaction->prepared) {
1030                 int ret = tdb_transaction_prepare_commit_internal(tdb);
1031                 if (ret)
1032                         return ret;
1033         }
1034
1035         methods = tdb->transaction->io_methods;
1036
1037         /* perform all the writes */
1038         for (i=0;i<tdb->transaction->num_blocks;i++) {
1039                 tdb_off_t offset;
1040                 tdb_len_t length;
1041
1042                 if (tdb->transaction->blocks[i] == NULL) {
1043                         continue;
1044                 }
1045
1046                 offset = i * tdb->transaction->block_size;
1047                 length = tdb->transaction->block_size;
1048                 if (i == tdb->transaction->num_blocks-1) {
1049                         length = tdb->transaction->last_block_size;
1050                 }
1051
1052                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1053                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1054                         
1055                         /* we've overwritten part of the data and
1056                            possibly expanded the file, so we need to
1057                            run the crash recovery code */
1058                         tdb->methods = methods;
1059                         tdb_transaction_recover(tdb); 
1060
1061                         tdb_transaction_cancel_internal(tdb);
1062                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1063
1064                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1065                         return -1;
1066                 }
1067                 SAFE_FREE(tdb->transaction->blocks[i]);
1068         } 
1069
1070         SAFE_FREE(tdb->transaction->blocks);
1071         tdb->transaction->num_blocks = 0;
1072
1073         /* ensure the new data is on disk */
1074         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1075                 return -1;
1076         }
1077
1078         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1079
1080         /*
1081           TODO: maybe write to some dummy hdr field, or write to magic
1082           offset without mmap, before the last sync, instead of the
1083           utime() call
1084         */
1085
1086         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1087            don't change the mtime of the file, this means the file may
1088            not be backed up (as tdb rounding to block sizes means that
1089            file size changes are quite rare too). The following forces
1090            mtime changes when a transaction completes */
1091 #if HAVE_UTIME
1092         utime(tdb->name, NULL);
1093 #endif
1094
1095         /* use a transaction cancel to free memory and remove the
1096            transaction locks */
1097         tdb_transaction_cancel_internal(tdb);
1098
1099         return 0;
1100 }
1101
1102
1103 /*
1104   recover from an aborted transaction. Must be called with exclusive
1105   database write access already established (including the global
1106   lock to prevent new processes attaching)
1107 */
1108 int tdb_transaction_recover(struct tdb_context *tdb)
1109 {
1110         tdb_off_t recovery_head, recovery_eof;
1111         unsigned char *data, *p;
1112         uint32_t zero = 0;
1113         struct list_struct rec;
1114
1115         /* find the recovery area */
1116         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1117                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1118                 tdb->ecode = TDB_ERR_IO;
1119                 return -1;
1120         }
1121
1122         if (recovery_head == 0) {
1123                 /* we have never allocated a recovery record */
1124                 return 0;
1125         }
1126
1127         /* read the recovery record */
1128         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1129                                    sizeof(rec), DOCONV()) == -1) {
1130                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1131                 tdb->ecode = TDB_ERR_IO;
1132                 return -1;
1133         }
1134
1135         if (rec.magic != TDB_RECOVERY_MAGIC) {
1136                 /* there is no valid recovery data */
1137                 return 0;
1138         }
1139
1140         if (tdb->read_only) {
1141                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1142                 tdb->ecode = TDB_ERR_CORRUPT;
1143                 return -1;
1144         }
1145
1146         recovery_eof = rec.key_len;
1147
1148         data = (unsigned char *)malloc(rec.data_len);
1149         if (data == NULL) {
1150                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1151                 tdb->ecode = TDB_ERR_OOM;
1152                 return -1;
1153         }
1154
1155         /* read the full recovery data */
1156         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1157                                    rec.data_len, 0) == -1) {
1158                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1159                 tdb->ecode = TDB_ERR_IO;
1160                 return -1;
1161         }
1162
1163         /* recover the file data */
1164         p = data;
1165         while (p+8 < data + rec.data_len) {
1166                 uint32_t ofs, len;
1167                 if (DOCONV()) {
1168                         tdb_convert(p, 8);
1169                 }
1170                 memcpy(&ofs, p, 4);
1171                 memcpy(&len, p+4, 4);
1172
1173                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1174                         free(data);
1175                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1176                         tdb->ecode = TDB_ERR_IO;
1177                         return -1;
1178                 }
1179                 p += 8 + len;
1180         }
1181
1182         free(data);
1183
1184         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1185                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1186                 tdb->ecode = TDB_ERR_IO;
1187                 return -1;
1188         }
1189
1190         /* if the recovery area is after the recovered eof then remove it */
1191         if (recovery_eof <= recovery_head) {
1192                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1193                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1194                         tdb->ecode = TDB_ERR_IO;
1195                         return -1;                      
1196                 }
1197         }
1198
1199         /* remove the recovery magic */
1200         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1201                           &zero) == -1) {
1202                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1203                 tdb->ecode = TDB_ERR_IO;
1204                 return -1;                      
1205         }
1206         
1207         /* reduce the file size to the old size */
1208         tdb_munmap(tdb);
1209         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1210                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1211                 tdb->ecode = TDB_ERR_IO;
1212                 return -1;                      
1213         }
1214         tdb->map_size = recovery_eof;
1215         tdb_mmap(tdb);
1216
1217         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1218                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1219                 tdb->ecode = TDB_ERR_IO;
1220                 return -1;
1221         }
1222
1223         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1224                  recovery_eof));
1225
1226         /* all done */
1227         return 0;
1228 }