]> git.ozlabs.org Git - ccan/blob - ccan/tdb/transaction.c
Tracing for tdb operations.
[ccan] / ccan / tdb / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_NO_NESTING is passed to flags in tdb open then transaction
89     nesting is disabled. tdb_transaction_start() will then implicitely
90     cancel any pending transactions and always start a new transaction
91     context instead of nesting.
92
93 */
94
95
96 /*
97   hold the context of any current transaction
98 */
99 struct tdb_transaction {
100         /* we keep a mirrored copy of the tdb hash heads here so
101            tdb_next_hash_chain() can operate efficiently */
102         uint32_t *hash_heads;
103
104         /* the original io methods - used to do IOs to the real db */
105         const struct tdb_methods *io_methods;
106
107         /* the list of transaction blocks. When a block is first
108            written to, it gets created in this list */
109         uint8_t **blocks;
110         uint32_t num_blocks;
111         uint32_t block_size;      /* bytes in each block */
112         uint32_t last_block_size; /* number of valid bytes in the last block */
113
114         /* non-zero when an internal transaction error has
115            occurred. All write operations will then fail until the
116            transaction is ended */
117         int transaction_error;
118
119         /* when inside a transaction we need to keep track of any
120            nested tdb_transaction_start() calls, as these are allowed,
121            but don't create a new transaction */
122         int nesting;
123
124         /* old file size before transaction */
125         tdb_len_t old_map_size;
126 };
127
128
129 /*
130   read while in a transaction. We need to check first if the data is in our list
131   of transaction elements, then if not do a real read
132 */
133 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
134                             tdb_len_t len, int cv)
135 {
136         uint32_t blk;
137
138         /* break it down into block sized ops */
139         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
140                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
141                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
142                         return -1;
143                 }
144                 len -= len2;
145                 off += len2;
146                 buf = (void *)(len2 + (char *)buf);
147         }
148
149         if (len == 0) {
150                 return 0;
151         }
152
153         blk = off / tdb->transaction->block_size;
154
155         /* see if we have it in the block list */
156         if (tdb->transaction->num_blocks <= blk ||
157             tdb->transaction->blocks[blk] == NULL) {
158                 /* nope, do a real read */
159                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
160                         goto fail;
161                 }
162                 return 0;
163         }
164
165         /* it is in the block list. Now check for the last block */
166         if (blk == tdb->transaction->num_blocks-1) {
167                 if (len > tdb->transaction->last_block_size) {
168                         goto fail;
169                 }
170         }
171         
172         /* now copy it out of this block */
173         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
174         if (cv) {
175                 tdb_convert(buf, len);
176         }
177         return 0;
178
179 fail:
180         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
181         tdb->ecode = TDB_ERR_IO;
182         tdb->transaction->transaction_error = 1;
183         return -1;
184 }
185
186
187 /*
188   write while in a transaction
189 */
190 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
191                              const void *buf, tdb_len_t len)
192 {
193         uint32_t blk;
194
195         /* if the write is to a hash head, then update the transaction
196            hash heads */
197         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
198             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
199                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
200                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
201         }
202
203         /* break it up into block sized chunks */
204         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
205                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
206                 if (transaction_write(tdb, off, buf, len2) != 0) {
207                         return -1;
208                 }
209                 len -= len2;
210                 off += len2;
211                 if (buf != NULL) {
212                         buf = (const void *)(len2 + (const char *)buf);
213                 }
214         }
215
216         if (len == 0) {
217                 return 0;
218         }
219
220         blk = off / tdb->transaction->block_size;
221         off = off % tdb->transaction->block_size;
222
223         if (tdb->transaction->num_blocks <= blk) {
224                 uint8_t **new_blocks;
225                 /* expand the blocks array */
226                 if (tdb->transaction->blocks == NULL) {
227                         new_blocks = (uint8_t **)malloc(
228                                 (blk+1)*sizeof(uint8_t *));
229                 } else {
230                         new_blocks = (uint8_t **)realloc(
231                                 tdb->transaction->blocks,
232                                 (blk+1)*sizeof(uint8_t *));
233                 }
234                 if (new_blocks == NULL) {
235                         tdb->ecode = TDB_ERR_OOM;
236                         goto fail;
237                 }
238                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
239                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
240                 tdb->transaction->blocks = new_blocks;
241                 tdb->transaction->num_blocks = blk+1;
242                 tdb->transaction->last_block_size = 0;
243         }
244
245         /* allocate and fill a block? */
246         if (tdb->transaction->blocks[blk] == NULL) {
247                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
248                 if (tdb->transaction->blocks[blk] == NULL) {
249                         tdb->ecode = TDB_ERR_OOM;
250                         tdb->transaction->transaction_error = 1;
251                         return -1;                      
252                 }
253                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
254                         tdb_len_t len2 = tdb->transaction->block_size;
255                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
256                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
257                         }
258                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
259                                                                    tdb->transaction->blocks[blk], 
260                                                                    len2, 0) != 0) {
261                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
262                                 tdb->ecode = TDB_ERR_IO;
263                                 goto fail;
264                         }
265                         if (blk == tdb->transaction->num_blocks-1) {
266                                 tdb->transaction->last_block_size = len2;
267                         }                       
268                 }
269         }
270         
271         /* overwrite part of an existing block */
272         if (buf == NULL) {
273                 memset(tdb->transaction->blocks[blk] + off, 0, len);
274         } else {
275                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
276         }
277         if (blk == tdb->transaction->num_blocks-1) {
278                 if (len + off > tdb->transaction->last_block_size) {
279                         tdb->transaction->last_block_size = len + off;
280                 }
281         }
282
283         return 0;
284
285 fail:
286         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
287                  (blk*tdb->transaction->block_size) + off, len));
288         tdb->transaction->transaction_error = 1;
289         return -1;
290 }
291
292
293 /*
294   write while in a transaction - this varient never expands the transaction blocks, it only
295   updates existing blocks. This means it cannot change the recovery size
296 */
297 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
298                                       const void *buf, tdb_len_t len)
299 {
300         uint32_t blk;
301
302         /* break it up into block sized chunks */
303         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
304                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
305                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
306                         return -1;
307                 }
308                 len -= len2;
309                 off += len2;
310                 if (buf != NULL) {
311                         buf = (const void *)(len2 + (const char *)buf);
312                 }
313         }
314
315         if (len == 0) {
316                 return 0;
317         }
318
319         blk = off / tdb->transaction->block_size;
320         off = off % tdb->transaction->block_size;
321
322         if (tdb->transaction->num_blocks <= blk ||
323             tdb->transaction->blocks[blk] == NULL) {
324                 return 0;
325         }
326
327         if (blk == tdb->transaction->num_blocks-1 &&
328             off + len > tdb->transaction->last_block_size) {
329                 if (off >= tdb->transaction->last_block_size) {
330                         return 0;
331                 }
332                 len = tdb->transaction->last_block_size - off;
333         }
334
335         /* overwrite part of an existing block */
336         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
337
338         return 0;
339 }
340
341
342 /*
343   accelerated hash chain head search, using the cached hash heads
344 */
345 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
346 {
347         uint32_t h = *chain;
348         for (;h < tdb->header.hash_size;h++) {
349                 /* the +1 takes account of the freelist */
350                 if (0 != tdb->transaction->hash_heads[h+1]) {
351                         break;
352                 }
353         }
354         (*chain) = h;
355 }
356
357 /*
358   out of bounds check during a transaction
359 */
360 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
361 {
362         if (len <= tdb->map_size) {
363                 return 0;
364         }
365         return TDB_ERRCODE(TDB_ERR_IO, -1);
366 }
367
368 /*
369   transaction version of tdb_expand().
370 */
371 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
372                                    tdb_off_t addition)
373 {
374         /* add a write to the transaction elements, so subsequent
375            reads see the zero data */
376         if (transaction_write(tdb, size, NULL, addition) != 0) {
377                 return -1;
378         }
379
380         return 0;
381 }
382
383 /*
384   brlock during a transaction - ignore them
385 */
386 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
387                               int rw_type, int lck_type, int probe, size_t len)
388 {
389         return 0;
390 }
391
392 static const struct tdb_methods transaction_methods = {
393         transaction_read,
394         transaction_write,
395         transaction_next_hash_chain,
396         transaction_oob,
397         transaction_expand_file,
398         transaction_brlock
399 };
400
401
402 /*
403   start a tdb transaction. No token is returned, as only a single
404   transaction is allowed to be pending per tdb_context
405 */
406 int tdb_transaction_start(struct tdb_context *tdb)
407 {
408         tdb_trace(tdb, "tdb_transaction_start\n");
409
410         /* some sanity checks */
411         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
412                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
413                 tdb->ecode = TDB_ERR_EINVAL;
414                 return -1;
415         }
416
417         /* cope with nested tdb_transaction_start() calls */
418         if (tdb->transaction != NULL) {
419                 if (!tdb->flags & TDB_NO_NESTING) {
420                         tdb->transaction->nesting++;
421                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
422                                  tdb->transaction->nesting));
423                         return 0;
424                 } else {
425                         tdb_transaction_cancel(tdb);
426                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
427                 }
428         }
429
430         if (tdb->num_locks != 0 || tdb->global_lock.count) {
431                 /* the caller must not have any locks when starting a
432                    transaction as otherwise we'll be screwed by lack
433                    of nested locks in posix */
434                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
435                 tdb->ecode = TDB_ERR_LOCK;
436                 return -1;
437         }
438
439         if (tdb->travlocks.next != NULL) {
440                 /* you cannot use transactions inside a traverse (although you can use
441                    traverse inside a transaction) as otherwise you can end up with
442                    deadlock */
443                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
444                 tdb->ecode = TDB_ERR_LOCK;
445                 return -1;
446         }
447
448         tdb->transaction = (struct tdb_transaction *)
449                 calloc(sizeof(struct tdb_transaction), 1);
450         if (tdb->transaction == NULL) {
451                 tdb->ecode = TDB_ERR_OOM;
452                 return -1;
453         }
454
455         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
456         tdb->transaction->block_size = tdb->page_size;
457
458         /* get the transaction write lock. This is a blocking lock. As
459            discussed with Volker, there are a number of ways we could
460            make this async, which we will probably do in the future */
461         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
462                 SAFE_FREE(tdb->transaction->blocks);
463                 SAFE_FREE(tdb->transaction);
464                 return -1;
465         }
466         
467         /* get a read lock from the freelist to the end of file. This
468            is upgraded to a write lock during the commit */
469         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
470                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
471                 tdb->ecode = TDB_ERR_LOCK;
472                 goto fail;
473         }
474
475         /* setup a copy of the hash table heads so the hash scan in
476            traverse can be fast */
477         tdb->transaction->hash_heads = (uint32_t *)
478                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
479         if (tdb->transaction->hash_heads == NULL) {
480                 tdb->ecode = TDB_ERR_OOM;
481                 goto fail;
482         }
483         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
484                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
485                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
486                 tdb->ecode = TDB_ERR_IO;
487                 goto fail;
488         }
489
490         /* make sure we know about any file expansions already done by
491            anyone else */
492         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
493         tdb->transaction->old_map_size = tdb->map_size;
494
495         /* finally hook the io methods, replacing them with
496            transaction specific methods */
497         tdb->transaction->io_methods = tdb->methods;
498         tdb->methods = &transaction_methods;
499
500         return 0;
501         
502 fail:
503         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
504         tdb_transaction_unlock(tdb);
505         SAFE_FREE(tdb->transaction->blocks);
506         SAFE_FREE(tdb->transaction->hash_heads);
507         SAFE_FREE(tdb->transaction);
508         return -1;
509 }
510
511
512 /*
513   cancel the current transaction
514 */
515 int tdb_transaction_cancel(struct tdb_context *tdb)
516 {       
517         int i;
518
519         tdb_trace(tdb, "tdb_transaction_cancel\n");
520         if (tdb->transaction == NULL) {
521                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
522                 return -1;
523         }
524
525         if (tdb->transaction->nesting != 0) {
526                 tdb->transaction->transaction_error = 1;
527                 tdb->transaction->nesting--;
528                 return 0;
529         }               
530
531         tdb->map_size = tdb->transaction->old_map_size;
532
533         /* free all the transaction blocks */
534         for (i=0;i<tdb->transaction->num_blocks;i++) {
535                 if (tdb->transaction->blocks[i] != NULL) {
536                         free(tdb->transaction->blocks[i]);
537                 }
538         }
539         SAFE_FREE(tdb->transaction->blocks);
540
541         /* remove any global lock created during the transaction */
542         if (tdb->global_lock.count != 0) {
543                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
544                 tdb->global_lock.count = 0;
545         }
546
547         /* remove any locks created during the transaction */
548         if (tdb->num_locks != 0) {
549                 for (i=0;i<tdb->num_lockrecs;i++) {
550                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
551                                    F_UNLCK,F_SETLKW, 0, 1);
552                 }
553                 tdb->num_locks = 0;
554                 tdb->num_lockrecs = 0;
555                 SAFE_FREE(tdb->lockrecs);
556         }
557
558         /* restore the normal io methods */
559         tdb->methods = tdb->transaction->io_methods;
560
561         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
562         tdb_transaction_unlock(tdb);
563         SAFE_FREE(tdb->transaction->hash_heads);
564         SAFE_FREE(tdb->transaction);
565         
566         return 0;
567 }
568
569 /*
570   sync to disk
571 */
572 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
573 {       
574         if (fsync(tdb->fd) != 0) {
575                 tdb->ecode = TDB_ERR_IO;
576                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
577                 return -1;
578         }
579 #ifdef MS_SYNC
580         if (tdb->map_ptr) {
581                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
582                 if (msync(moffset + (char *)tdb->map_ptr, 
583                           length + (offset - moffset), MS_SYNC) != 0) {
584                         tdb->ecode = TDB_ERR_IO;
585                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
586                                  strerror(errno)));
587                         return -1;
588                 }
589         }
590 #endif
591         return 0;
592 }
593
594
595 /*
596   work out how much space the linearised recovery data will consume
597 */
598 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
599 {
600         tdb_len_t recovery_size = 0;
601         int i;
602
603         recovery_size = sizeof(uint32_t);
604         for (i=0;i<tdb->transaction->num_blocks;i++) {
605                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
606                         break;
607                 }
608                 if (tdb->transaction->blocks[i] == NULL) {
609                         continue;
610                 }
611                 recovery_size += 2*sizeof(tdb_off_t);
612                 if (i == tdb->transaction->num_blocks-1) {
613                         recovery_size += tdb->transaction->last_block_size;
614                 } else {
615                         recovery_size += tdb->transaction->block_size;
616                 }
617         }       
618
619         return recovery_size;
620 }
621
622 /*
623   allocate the recovery area, or use an existing recovery area if it is
624   large enough
625 */
626 static int tdb_recovery_allocate(struct tdb_context *tdb, 
627                                  tdb_len_t *recovery_size,
628                                  tdb_off_t *recovery_offset,
629                                  tdb_len_t *recovery_max_size)
630 {
631         struct list_struct rec;
632         const struct tdb_methods *methods = tdb->transaction->io_methods;
633         tdb_off_t recovery_head;
634
635         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
636                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
637                 return -1;
638         }
639
640         rec.rec_len = 0;
641
642         if (recovery_head != 0 && 
643             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
644                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
645                 return -1;
646         }
647
648         *recovery_size = tdb_recovery_size(tdb);
649
650         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
651                 /* it fits in the existing area */
652                 *recovery_max_size = rec.rec_len;
653                 *recovery_offset = recovery_head;
654                 return 0;
655         }
656
657         /* we need to free up the old recovery area, then allocate a
658            new one at the end of the file. Note that we cannot use
659            tdb_allocate() to allocate the new one as that might return
660            us an area that is being currently used (as of the start of
661            the transaction) */
662         if (recovery_head != 0) {
663                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
664                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
665                         return -1;
666                 }
667         }
668
669         /* the tdb_free() call might have increased the recovery size */
670         *recovery_size = tdb_recovery_size(tdb);
671
672         /* round up to a multiple of page size */
673         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
674         *recovery_offset = tdb->map_size;
675         recovery_head = *recovery_offset;
676
677         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
678                                      (tdb->map_size - tdb->transaction->old_map_size) +
679                                      sizeof(rec) + *recovery_max_size) == -1) {
680                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
681                 return -1;
682         }
683
684         /* remap the file (if using mmap) */
685         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
686
687         /* we have to reset the old map size so that we don't try to expand the file
688            again in the transaction commit, which would destroy the recovery area */
689         tdb->transaction->old_map_size = tdb->map_size;
690
691         /* write the recovery header offset and sync - we can sync without a race here
692            as the magic ptr in the recovery record has not been set */
693         CONVERT(recovery_head);
694         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
695                                &recovery_head, sizeof(tdb_off_t)) == -1) {
696                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
697                 return -1;
698         }
699         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
700                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
701                 return -1;
702         }
703
704         return 0;
705 }
706
707
708 /*
709   setup the recovery data that will be used on a crash during commit
710 */
711 static int transaction_setup_recovery(struct tdb_context *tdb, 
712                                       tdb_off_t *magic_offset)
713 {
714         tdb_len_t recovery_size;
715         unsigned char *data, *p;
716         const struct tdb_methods *methods = tdb->transaction->io_methods;
717         struct list_struct *rec;
718         tdb_off_t recovery_offset, recovery_max_size;
719         tdb_off_t old_map_size = tdb->transaction->old_map_size;
720         uint32_t magic, tailer;
721         int i;
722
723         /*
724           check that the recovery area has enough space
725         */
726         if (tdb_recovery_allocate(tdb, &recovery_size, 
727                                   &recovery_offset, &recovery_max_size) == -1) {
728                 return -1;
729         }
730
731         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
732         if (data == NULL) {
733                 tdb->ecode = TDB_ERR_OOM;
734                 return -1;
735         }
736
737         rec = (struct list_struct *)data;
738         memset(rec, 0, sizeof(*rec));
739
740         rec->magic    = 0;
741         rec->data_len = recovery_size;
742         rec->rec_len  = recovery_max_size;
743         rec->key_len  = old_map_size;
744         CONVERT(rec);
745
746         /* build the recovery data into a single blob to allow us to do a single
747            large write, which should be more efficient */
748         p = data + sizeof(*rec);
749         for (i=0;i<tdb->transaction->num_blocks;i++) {
750                 tdb_off_t offset;
751                 tdb_len_t length;
752
753                 if (tdb->transaction->blocks[i] == NULL) {
754                         continue;
755                 }
756
757                 offset = i * tdb->transaction->block_size;
758                 length = tdb->transaction->block_size;
759                 if (i == tdb->transaction->num_blocks-1) {
760                         length = tdb->transaction->last_block_size;
761                 }
762                 
763                 if (offset >= old_map_size) {
764                         continue;
765                 }
766                 if (offset + length > tdb->transaction->old_map_size) {
767                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
768                         free(data);
769                         tdb->ecode = TDB_ERR_CORRUPT;
770                         return -1;
771                 }
772                 memcpy(p, &offset, 4);
773                 memcpy(p+4, &length, 4);
774                 if (DOCONV()) {
775                         tdb_convert(p, 8);
776                 }
777                 /* the recovery area contains the old data, not the
778                    new data, so we have to call the original tdb_read
779                    method to get it */
780                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
781                         free(data);
782                         tdb->ecode = TDB_ERR_IO;
783                         return -1;
784                 }
785                 p += 8 + length;
786         }
787
788         /* and the tailer */
789         tailer = sizeof(*rec) + recovery_max_size;
790         memcpy(p, &tailer, 4);
791         CONVERT(p);
792
793         /* write the recovery data to the recovery area */
794         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
795                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
796                 free(data);
797                 tdb->ecode = TDB_ERR_IO;
798                 return -1;
799         }
800         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
801                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
802                 free(data);
803                 tdb->ecode = TDB_ERR_IO;
804                 return -1;
805         }
806
807         /* as we don't have ordered writes, we have to sync the recovery
808            data before we update the magic to indicate that the recovery
809            data is present */
810         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
811                 free(data);
812                 return -1;
813         }
814
815         free(data);
816
817         magic = TDB_RECOVERY_MAGIC;
818         CONVERT(magic);
819
820         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
821
822         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
823                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
824                 tdb->ecode = TDB_ERR_IO;
825                 return -1;
826         }
827         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
828                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
829                 tdb->ecode = TDB_ERR_IO;
830                 return -1;
831         }
832
833         /* ensure the recovery magic marker is on disk */
834         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
835                 return -1;
836         }
837
838         return 0;
839 }
840
841 /*
842   commit the current transaction
843 */
844 int tdb_transaction_commit(struct tdb_context *tdb)
845 {       
846         const struct tdb_methods *methods;
847         tdb_off_t magic_offset = 0;
848         uint32_t zero = 0;
849         int i;
850
851         tdb_trace(tdb, "tdb_transaction_commit\n");
852         if (tdb->transaction == NULL) {
853                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
854                 return -1;
855         }
856
857         if (tdb->transaction->transaction_error) {
858                 tdb->ecode = TDB_ERR_IO;
859                 tdb_transaction_cancel(tdb);
860                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
861                 return -1;
862         }
863
864
865         if (tdb->transaction->nesting != 0) {
866                 tdb->transaction->nesting--;
867                 return 0;
868         }               
869
870         /* check for a null transaction */
871         if (tdb->transaction->blocks == NULL) {
872                 tdb_transaction_cancel(tdb);
873                 return 0;
874         }
875
876         methods = tdb->transaction->io_methods;
877         
878         /* if there are any locks pending then the caller has not
879            nested their locks properly, so fail the transaction */
880         if (tdb->num_locks || tdb->global_lock.count) {
881                 tdb->ecode = TDB_ERR_LOCK;
882                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
883                 tdb_transaction_cancel(tdb);
884                 return -1;
885         }
886
887         /* upgrade the main transaction lock region to a write lock */
888         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
889                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
890                 tdb->ecode = TDB_ERR_LOCK;
891                 tdb_transaction_cancel(tdb);
892                 return -1;
893         }
894
895         /* get the global lock - this prevents new users attaching to the database
896            during the commit */
897         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
898                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
899                 tdb->ecode = TDB_ERR_LOCK;
900                 tdb_transaction_cancel(tdb);
901                 return -1;
902         }
903
904         if (!(tdb->flags & TDB_NOSYNC)) {
905                 /* write the recovery data to the end of the file */
906                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
907                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
908                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
909                         tdb_transaction_cancel(tdb);
910                         return -1;
911                 }
912         }
913
914         /* expand the file to the new size if needed */
915         if (tdb->map_size != tdb->transaction->old_map_size) {
916                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
917                                              tdb->map_size - 
918                                              tdb->transaction->old_map_size) == -1) {
919                         tdb->ecode = TDB_ERR_IO;
920                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
921                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
922                         tdb_transaction_cancel(tdb);
923                         return -1;
924                 }
925                 tdb->map_size = tdb->transaction->old_map_size;
926                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
927         }
928
929         /* perform all the writes */
930         for (i=0;i<tdb->transaction->num_blocks;i++) {
931                 tdb_off_t offset;
932                 tdb_len_t length;
933
934                 if (tdb->transaction->blocks[i] == NULL) {
935                         continue;
936                 }
937
938                 offset = i * tdb->transaction->block_size;
939                 length = tdb->transaction->block_size;
940                 if (i == tdb->transaction->num_blocks-1) {
941                         length = tdb->transaction->last_block_size;
942                 }
943
944                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
945                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
946                         
947                         /* we've overwritten part of the data and
948                            possibly expanded the file, so we need to
949                            run the crash recovery code */
950                         tdb->methods = methods;
951                         tdb_transaction_recover(tdb); 
952
953                         tdb_transaction_cancel(tdb);
954                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
955
956                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
957                         return -1;
958                 }
959                 SAFE_FREE(tdb->transaction->blocks[i]);
960         } 
961
962         SAFE_FREE(tdb->transaction->blocks);
963         tdb->transaction->num_blocks = 0;
964
965         if (!(tdb->flags & TDB_NOSYNC)) {
966                 /* ensure the new data is on disk */
967                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
968                         return -1;
969                 }
970
971                 /* remove the recovery marker */
972                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
973                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
974                         return -1;
975                 }
976
977                 /* ensure the recovery marker has been removed on disk */
978                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
979                         return -1;
980                 }
981         }
982
983         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
984
985         /*
986           TODO: maybe write to some dummy hdr field, or write to magic
987           offset without mmap, before the last sync, instead of the
988           utime() call
989         */
990
991         /* on some systems (like Linux 2.6.x) changes via mmap/msync
992            don't change the mtime of the file, this means the file may
993            not be backed up (as tdb rounding to block sizes means that
994            file size changes are quite rare too). The following forces
995            mtime changes when a transaction completes */
996 #if HAVE_UTIME
997         utime(tdb->name, NULL);
998 #endif
999
1000         /* use a transaction cancel to free memory and remove the
1001            transaction locks */
1002         tdb_transaction_cancel(tdb);
1003
1004         return 0;
1005 }
1006
1007
1008 /*
1009   recover from an aborted transaction. Must be called with exclusive
1010   database write access already established (including the global
1011   lock to prevent new processes attaching)
1012 */
1013 int tdb_transaction_recover(struct tdb_context *tdb)
1014 {
1015         tdb_off_t recovery_head, recovery_eof;
1016         unsigned char *data, *p;
1017         uint32_t zero = 0;
1018         struct list_struct rec;
1019
1020         /* find the recovery area */
1021         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1022                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1023                 tdb->ecode = TDB_ERR_IO;
1024                 return -1;
1025         }
1026
1027         if (recovery_head == 0) {
1028                 /* we have never allocated a recovery record */
1029                 return 0;
1030         }
1031
1032         /* read the recovery record */
1033         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1034                                    sizeof(rec), DOCONV()) == -1) {
1035                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1036                 tdb->ecode = TDB_ERR_IO;
1037                 return -1;
1038         }
1039
1040         if (rec.magic != TDB_RECOVERY_MAGIC) {
1041                 /* there is no valid recovery data */
1042                 return 0;
1043         }
1044
1045         if (tdb->read_only) {
1046                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1047                 tdb->ecode = TDB_ERR_CORRUPT;
1048                 return -1;
1049         }
1050
1051         recovery_eof = rec.key_len;
1052
1053         data = (unsigned char *)malloc(rec.data_len);
1054         if (data == NULL) {
1055                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1056                 tdb->ecode = TDB_ERR_OOM;
1057                 return -1;
1058         }
1059
1060         /* read the full recovery data */
1061         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1062                                    rec.data_len, 0) == -1) {
1063                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1064                 tdb->ecode = TDB_ERR_IO;
1065                 return -1;
1066         }
1067
1068         /* recover the file data */
1069         p = data;
1070         while (p+8 < data + rec.data_len) {
1071                 uint32_t ofs, len;
1072                 if (DOCONV()) {
1073                         tdb_convert(p, 8);
1074                 }
1075                 memcpy(&ofs, p, 4);
1076                 memcpy(&len, p+4, 4);
1077
1078                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1079                         free(data);
1080                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1081                         tdb->ecode = TDB_ERR_IO;
1082                         return -1;
1083                 }
1084                 p += 8 + len;
1085         }
1086
1087         free(data);
1088
1089         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1090                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1091                 tdb->ecode = TDB_ERR_IO;
1092                 return -1;
1093         }
1094
1095         /* if the recovery area is after the recovered eof then remove it */
1096         if (recovery_eof <= recovery_head) {
1097                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1098                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1099                         tdb->ecode = TDB_ERR_IO;
1100                         return -1;                      
1101                 }
1102         }
1103
1104         /* remove the recovery magic */
1105         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1106                           &zero) == -1) {
1107                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1108                 tdb->ecode = TDB_ERR_IO;
1109                 return -1;                      
1110         }
1111         
1112         /* reduce the file size to the old size */
1113         tdb_munmap(tdb);
1114         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1115                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1116                 tdb->ecode = TDB_ERR_IO;
1117                 return -1;                      
1118         }
1119         tdb->map_size = recovery_eof;
1120         tdb_mmap(tdb);
1121
1122         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1123                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1124                 tdb->ecode = TDB_ERR_IO;
1125                 return -1;
1126         }
1127
1128         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1129                  recovery_eof));
1130
1131         /* all done */
1132         return 0;
1133 }