]> git.ozlabs.org Git - ccan/blob - ccan/tdb/transaction.c
6a34c4526993d73d25a13778474739a3afa0b971
[ccan] / ccan / tdb / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_NO_NESTING is passed to flags in tdb open then transaction
89     nesting is disabled. tdb_transaction_start() will then implicitely
90     cancel any pending transactions and always start a new transaction
91     context instead of nesting.
92
93 */
94
95
96 /*
97   hold the context of any current transaction
98 */
99 struct tdb_transaction {
100         /* we keep a mirrored copy of the tdb hash heads here so
101            tdb_next_hash_chain() can operate efficiently */
102         uint32_t *hash_heads;
103
104         /* the original io methods - used to do IOs to the real db */
105         const struct tdb_methods *io_methods;
106
107         /* the list of transaction blocks. When a block is first
108            written to, it gets created in this list */
109         uint8_t **blocks;
110         uint32_t num_blocks;
111         uint32_t block_size;      /* bytes in each block */
112         uint32_t last_block_size; /* number of valid bytes in the last block */
113
114         /* non-zero when an internal transaction error has
115            occurred. All write operations will then fail until the
116            transaction is ended */
117         int transaction_error;
118
119         /* when inside a transaction we need to keep track of any
120            nested tdb_transaction_start() calls, as these are allowed,
121            but don't create a new transaction */
122         int nesting;
123
124         /* old file size before transaction */
125         tdb_len_t old_map_size;
126 };
127
128
129 /*
130   read while in a transaction. We need to check first if the data is in our list
131   of transaction elements, then if not do a real read
132 */
133 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
134                             tdb_len_t len, int cv)
135 {
136         uint32_t blk;
137
138         /* break it down into block sized ops */
139         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
140                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
141                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
142                         return -1;
143                 }
144                 len -= len2;
145                 off += len2;
146                 buf = (void *)(len2 + (char *)buf);
147         }
148
149         if (len == 0) {
150                 return 0;
151         }
152
153         blk = off / tdb->transaction->block_size;
154
155         /* see if we have it in the block list */
156         if (tdb->transaction->num_blocks <= blk ||
157             tdb->transaction->blocks[blk] == NULL) {
158                 /* nope, do a real read */
159                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
160                         goto fail;
161                 }
162                 return 0;
163         }
164
165         /* it is in the block list. Now check for the last block */
166         if (blk == tdb->transaction->num_blocks-1) {
167                 if (len > tdb->transaction->last_block_size) {
168                         goto fail;
169                 }
170         }
171         
172         /* now copy it out of this block */
173         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
174         if (cv) {
175                 tdb_convert(buf, len);
176         }
177         return 0;
178
179 fail:
180         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
181         tdb->ecode = TDB_ERR_IO;
182         tdb->transaction->transaction_error = 1;
183         return -1;
184 }
185
186
187 /*
188   write while in a transaction
189 */
190 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
191                              const void *buf, tdb_len_t len)
192 {
193         uint32_t blk;
194
195         /* if the write is to a hash head, then update the transaction
196            hash heads */
197         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
198             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
199                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
200                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
201         }
202
203         /* break it up into block sized chunks */
204         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
205                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
206                 if (transaction_write(tdb, off, buf, len2) != 0) {
207                         return -1;
208                 }
209                 len -= len2;
210                 off += len2;
211                 if (buf != NULL) {
212                         buf = (const void *)(len2 + (const char *)buf);
213                 }
214         }
215
216         if (len == 0) {
217                 return 0;
218         }
219
220         blk = off / tdb->transaction->block_size;
221         off = off % tdb->transaction->block_size;
222
223         if (tdb->transaction->num_blocks <= blk) {
224                 uint8_t **new_blocks;
225                 /* expand the blocks array */
226                 if (tdb->transaction->blocks == NULL) {
227                         new_blocks = (uint8_t **)malloc(
228                                 (blk+1)*sizeof(uint8_t *));
229                 } else {
230                         new_blocks = (uint8_t **)realloc(
231                                 tdb->transaction->blocks,
232                                 (blk+1)*sizeof(uint8_t *));
233                 }
234                 if (new_blocks == NULL) {
235                         tdb->ecode = TDB_ERR_OOM;
236                         goto fail;
237                 }
238                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
239                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
240                 tdb->transaction->blocks = new_blocks;
241                 tdb->transaction->num_blocks = blk+1;
242                 tdb->transaction->last_block_size = 0;
243         }
244
245         /* allocate and fill a block? */
246         if (tdb->transaction->blocks[blk] == NULL) {
247                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
248                 if (tdb->transaction->blocks[blk] == NULL) {
249                         tdb->ecode = TDB_ERR_OOM;
250                         tdb->transaction->transaction_error = 1;
251                         return -1;                      
252                 }
253                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
254                         tdb_len_t len2 = tdb->transaction->block_size;
255                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
256                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
257                         }
258                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
259                                                                    tdb->transaction->blocks[blk], 
260                                                                    len2, 0) != 0) {
261                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
262                                 tdb->ecode = TDB_ERR_IO;
263                                 goto fail;
264                         }
265                         if (blk == tdb->transaction->num_blocks-1) {
266                                 tdb->transaction->last_block_size = len2;
267                         }                       
268                 }
269         }
270         
271         /* overwrite part of an existing block */
272         if (buf == NULL) {
273                 memset(tdb->transaction->blocks[blk] + off, 0, len);
274         } else {
275                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
276         }
277         if (blk == tdb->transaction->num_blocks-1) {
278                 if (len + off > tdb->transaction->last_block_size) {
279                         tdb->transaction->last_block_size = len + off;
280                 }
281         }
282
283         return 0;
284
285 fail:
286         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
287                  (blk*tdb->transaction->block_size) + off, len));
288         tdb->transaction->transaction_error = 1;
289         return -1;
290 }
291
292
293 /*
294   write while in a transaction - this varient never expands the transaction blocks, it only
295   updates existing blocks. This means it cannot change the recovery size
296 */
297 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
298                                       const void *buf, tdb_len_t len)
299 {
300         uint32_t blk;
301
302         /* break it up into block sized chunks */
303         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
304                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
305                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
306                         return -1;
307                 }
308                 len -= len2;
309                 off += len2;
310                 if (buf != NULL) {
311                         buf = (const void *)(len2 + (const char *)buf);
312                 }
313         }
314
315         if (len == 0) {
316                 return 0;
317         }
318
319         blk = off / tdb->transaction->block_size;
320         off = off % tdb->transaction->block_size;
321
322         if (tdb->transaction->num_blocks <= blk ||
323             tdb->transaction->blocks[blk] == NULL) {
324                 return 0;
325         }
326
327         if (blk == tdb->transaction->num_blocks-1 &&
328             off + len > tdb->transaction->last_block_size) {
329                 if (off >= tdb->transaction->last_block_size) {
330                         return 0;
331                 }
332                 len = tdb->transaction->last_block_size - off;
333         }
334
335         /* overwrite part of an existing block */
336         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
337
338         return 0;
339 }
340
341
342 /*
343   accelerated hash chain head search, using the cached hash heads
344 */
345 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
346 {
347         uint32_t h = *chain;
348         for (;h < tdb->header.hash_size;h++) {
349                 /* the +1 takes account of the freelist */
350                 if (0 != tdb->transaction->hash_heads[h+1]) {
351                         break;
352                 }
353         }
354         (*chain) = h;
355 }
356
357 /*
358   out of bounds check during a transaction
359 */
360 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
361 {
362         if (len <= tdb->map_size) {
363                 return 0;
364         }
365         return TDB_ERRCODE(TDB_ERR_IO, -1);
366 }
367
368 /*
369   transaction version of tdb_expand().
370 */
371 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
372                                    tdb_off_t addition)
373 {
374         /* add a write to the transaction elements, so subsequent
375            reads see the zero data */
376         if (transaction_write(tdb, size, NULL, addition) != 0) {
377                 return -1;
378         }
379
380         return 0;
381 }
382
383 /*
384   brlock during a transaction - ignore them
385 */
386 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
387                               int rw_type, int lck_type, int probe, size_t len)
388 {
389         return 0;
390 }
391
392 static const struct tdb_methods transaction_methods = {
393         transaction_read,
394         transaction_write,
395         transaction_next_hash_chain,
396         transaction_oob,
397         transaction_expand_file,
398         transaction_brlock
399 };
400
401
402 /*
403   start a tdb transaction. No token is returned, as only a single
404   transaction is allowed to be pending per tdb_context
405 */
406 int tdb_transaction_start(struct tdb_context *tdb)
407 {
408         /* some sanity checks */
409         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
410                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
411                 tdb->ecode = TDB_ERR_EINVAL;
412                 return -1;
413         }
414
415         /* cope with nested tdb_transaction_start() calls */
416         if (tdb->transaction != NULL) {
417                 if (!tdb->flags & TDB_NO_NESTING) {
418                         tdb->transaction->nesting++;
419                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
420                                  tdb->transaction->nesting));
421                         return 0;
422                 } else {
423                         tdb_transaction_cancel(tdb);
424                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
425                 }
426         }
427
428         if (tdb->num_locks != 0 || tdb->global_lock.count) {
429                 /* the caller must not have any locks when starting a
430                    transaction as otherwise we'll be screwed by lack
431                    of nested locks in posix */
432                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
433                 tdb->ecode = TDB_ERR_LOCK;
434                 return -1;
435         }
436
437         if (tdb->travlocks.next != NULL) {
438                 /* you cannot use transactions inside a traverse (although you can use
439                    traverse inside a transaction) as otherwise you can end up with
440                    deadlock */
441                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
442                 tdb->ecode = TDB_ERR_LOCK;
443                 return -1;
444         }
445
446         tdb->transaction = (struct tdb_transaction *)
447                 calloc(sizeof(struct tdb_transaction), 1);
448         if (tdb->transaction == NULL) {
449                 tdb->ecode = TDB_ERR_OOM;
450                 return -1;
451         }
452
453         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
454         tdb->transaction->block_size = tdb->page_size;
455
456         /* get the transaction write lock. This is a blocking lock. As
457            discussed with Volker, there are a number of ways we could
458            make this async, which we will probably do in the future */
459         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
460                 SAFE_FREE(tdb->transaction->blocks);
461                 SAFE_FREE(tdb->transaction);
462                 return -1;
463         }
464         
465         /* get a read lock from the freelist to the end of file. This
466            is upgraded to a write lock during the commit */
467         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
468                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
469                 tdb->ecode = TDB_ERR_LOCK;
470                 goto fail;
471         }
472
473         /* setup a copy of the hash table heads so the hash scan in
474            traverse can be fast */
475         tdb->transaction->hash_heads = (uint32_t *)
476                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
477         if (tdb->transaction->hash_heads == NULL) {
478                 tdb->ecode = TDB_ERR_OOM;
479                 goto fail;
480         }
481         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
482                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
483                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
484                 tdb->ecode = TDB_ERR_IO;
485                 goto fail;
486         }
487
488         /* make sure we know about any file expansions already done by
489            anyone else */
490         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
491         tdb->transaction->old_map_size = tdb->map_size;
492
493         /* finally hook the io methods, replacing them with
494            transaction specific methods */
495         tdb->transaction->io_methods = tdb->methods;
496         tdb->methods = &transaction_methods;
497
498         return 0;
499         
500 fail:
501         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
502         tdb_transaction_unlock(tdb);
503         SAFE_FREE(tdb->transaction->blocks);
504         SAFE_FREE(tdb->transaction->hash_heads);
505         SAFE_FREE(tdb->transaction);
506         return -1;
507 }
508
509
510 /*
511   cancel the current transaction
512 */
513 int tdb_transaction_cancel(struct tdb_context *tdb)
514 {       
515         int i;
516
517         if (tdb->transaction == NULL) {
518                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
519                 return -1;
520         }
521
522         if (tdb->transaction->nesting != 0) {
523                 tdb->transaction->transaction_error = 1;
524                 tdb->transaction->nesting--;
525                 return 0;
526         }               
527
528         tdb->map_size = tdb->transaction->old_map_size;
529
530         /* free all the transaction blocks */
531         for (i=0;i<tdb->transaction->num_blocks;i++) {
532                 if (tdb->transaction->blocks[i] != NULL) {
533                         free(tdb->transaction->blocks[i]);
534                 }
535         }
536         SAFE_FREE(tdb->transaction->blocks);
537
538         /* remove any global lock created during the transaction */
539         if (tdb->global_lock.count != 0) {
540                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
541                 tdb->global_lock.count = 0;
542         }
543
544         /* remove any locks created during the transaction */
545         if (tdb->num_locks != 0) {
546                 for (i=0;i<tdb->num_lockrecs;i++) {
547                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
548                                    F_UNLCK,F_SETLKW, 0, 1);
549                 }
550                 tdb->num_locks = 0;
551                 tdb->num_lockrecs = 0;
552                 SAFE_FREE(tdb->lockrecs);
553         }
554
555         /* restore the normal io methods */
556         tdb->methods = tdb->transaction->io_methods;
557
558         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
559         tdb_transaction_unlock(tdb);
560         SAFE_FREE(tdb->transaction->hash_heads);
561         SAFE_FREE(tdb->transaction);
562         
563         return 0;
564 }
565
566 /*
567   sync to disk
568 */
569 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
570 {       
571         if (fsync(tdb->fd) != 0) {
572                 tdb->ecode = TDB_ERR_IO;
573                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
574                 return -1;
575         }
576 #ifdef MS_SYNC
577         if (tdb->map_ptr) {
578                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
579                 if (msync(moffset + (char *)tdb->map_ptr, 
580                           length + (offset - moffset), MS_SYNC) != 0) {
581                         tdb->ecode = TDB_ERR_IO;
582                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
583                                  strerror(errno)));
584                         return -1;
585                 }
586         }
587 #endif
588         return 0;
589 }
590
591
592 /*
593   work out how much space the linearised recovery data will consume
594 */
595 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
596 {
597         tdb_len_t recovery_size = 0;
598         int i;
599
600         recovery_size = sizeof(uint32_t);
601         for (i=0;i<tdb->transaction->num_blocks;i++) {
602                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
603                         break;
604                 }
605                 if (tdb->transaction->blocks[i] == NULL) {
606                         continue;
607                 }
608                 recovery_size += 2*sizeof(tdb_off_t);
609                 if (i == tdb->transaction->num_blocks-1) {
610                         recovery_size += tdb->transaction->last_block_size;
611                 } else {
612                         recovery_size += tdb->transaction->block_size;
613                 }
614         }       
615
616         return recovery_size;
617 }
618
619 /*
620   allocate the recovery area, or use an existing recovery area if it is
621   large enough
622 */
623 static int tdb_recovery_allocate(struct tdb_context *tdb, 
624                                  tdb_len_t *recovery_size,
625                                  tdb_off_t *recovery_offset,
626                                  tdb_len_t *recovery_max_size)
627 {
628         struct list_struct rec;
629         const struct tdb_methods *methods = tdb->transaction->io_methods;
630         tdb_off_t recovery_head;
631
632         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
633                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
634                 return -1;
635         }
636
637         rec.rec_len = 0;
638
639         if (recovery_head != 0 && 
640             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
641                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
642                 return -1;
643         }
644
645         *recovery_size = tdb_recovery_size(tdb);
646
647         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
648                 /* it fits in the existing area */
649                 *recovery_max_size = rec.rec_len;
650                 *recovery_offset = recovery_head;
651                 return 0;
652         }
653
654         /* we need to free up the old recovery area, then allocate a
655            new one at the end of the file. Note that we cannot use
656            tdb_allocate() to allocate the new one as that might return
657            us an area that is being currently used (as of the start of
658            the transaction) */
659         if (recovery_head != 0) {
660                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
661                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
662                         return -1;
663                 }
664         }
665
666         /* the tdb_free() call might have increased the recovery size */
667         *recovery_size = tdb_recovery_size(tdb);
668
669         /* round up to a multiple of page size */
670         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
671         *recovery_offset = tdb->map_size;
672         recovery_head = *recovery_offset;
673
674         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
675                                      (tdb->map_size - tdb->transaction->old_map_size) +
676                                      sizeof(rec) + *recovery_max_size) == -1) {
677                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
678                 return -1;
679         }
680
681         /* remap the file (if using mmap) */
682         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
683
684         /* we have to reset the old map size so that we don't try to expand the file
685            again in the transaction commit, which would destroy the recovery area */
686         tdb->transaction->old_map_size = tdb->map_size;
687
688         /* write the recovery header offset and sync - we can sync without a race here
689            as the magic ptr in the recovery record has not been set */
690         CONVERT(recovery_head);
691         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
692                                &recovery_head, sizeof(tdb_off_t)) == -1) {
693                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
694                 return -1;
695         }
696         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
697                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
698                 return -1;
699         }
700
701         return 0;
702 }
703
704
705 /*
706   setup the recovery data that will be used on a crash during commit
707 */
708 static int transaction_setup_recovery(struct tdb_context *tdb, 
709                                       tdb_off_t *magic_offset)
710 {
711         tdb_len_t recovery_size;
712         unsigned char *data, *p;
713         const struct tdb_methods *methods = tdb->transaction->io_methods;
714         struct list_struct *rec;
715         tdb_off_t recovery_offset, recovery_max_size;
716         tdb_off_t old_map_size = tdb->transaction->old_map_size;
717         uint32_t magic, tailer;
718         int i;
719
720         /*
721           check that the recovery area has enough space
722         */
723         if (tdb_recovery_allocate(tdb, &recovery_size, 
724                                   &recovery_offset, &recovery_max_size) == -1) {
725                 return -1;
726         }
727
728         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
729         if (data == NULL) {
730                 tdb->ecode = TDB_ERR_OOM;
731                 return -1;
732         }
733
734         rec = (struct list_struct *)data;
735         memset(rec, 0, sizeof(*rec));
736
737         rec->magic    = 0;
738         rec->data_len = recovery_size;
739         rec->rec_len  = recovery_max_size;
740         rec->key_len  = old_map_size;
741         CONVERT(rec);
742
743         /* build the recovery data into a single blob to allow us to do a single
744            large write, which should be more efficient */
745         p = data + sizeof(*rec);
746         for (i=0;i<tdb->transaction->num_blocks;i++) {
747                 tdb_off_t offset;
748                 tdb_len_t length;
749
750                 if (tdb->transaction->blocks[i] == NULL) {
751                         continue;
752                 }
753
754                 offset = i * tdb->transaction->block_size;
755                 length = tdb->transaction->block_size;
756                 if (i == tdb->transaction->num_blocks-1) {
757                         length = tdb->transaction->last_block_size;
758                 }
759                 
760                 if (offset >= old_map_size) {
761                         continue;
762                 }
763                 if (offset + length > tdb->transaction->old_map_size) {
764                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
765                         free(data);
766                         tdb->ecode = TDB_ERR_CORRUPT;
767                         return -1;
768                 }
769                 memcpy(p, &offset, 4);
770                 memcpy(p+4, &length, 4);
771                 if (DOCONV()) {
772                         tdb_convert(p, 8);
773                 }
774                 /* the recovery area contains the old data, not the
775                    new data, so we have to call the original tdb_read
776                    method to get it */
777                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
778                         free(data);
779                         tdb->ecode = TDB_ERR_IO;
780                         return -1;
781                 }
782                 p += 8 + length;
783         }
784
785         /* and the tailer */
786         tailer = sizeof(*rec) + recovery_max_size;
787         memcpy(p, &tailer, 4);
788         CONVERT(p);
789
790         /* write the recovery data to the recovery area */
791         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
792                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
793                 free(data);
794                 tdb->ecode = TDB_ERR_IO;
795                 return -1;
796         }
797         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
798                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
799                 free(data);
800                 tdb->ecode = TDB_ERR_IO;
801                 return -1;
802         }
803
804         /* as we don't have ordered writes, we have to sync the recovery
805            data before we update the magic to indicate that the recovery
806            data is present */
807         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
808                 free(data);
809                 return -1;
810         }
811
812         free(data);
813
814         magic = TDB_RECOVERY_MAGIC;
815         CONVERT(magic);
816
817         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
818
819         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
820                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
821                 tdb->ecode = TDB_ERR_IO;
822                 return -1;
823         }
824         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
825                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
826                 tdb->ecode = TDB_ERR_IO;
827                 return -1;
828         }
829
830         /* ensure the recovery magic marker is on disk */
831         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
832                 return -1;
833         }
834
835         return 0;
836 }
837
838 /*
839   commit the current transaction
840 */
841 int tdb_transaction_commit(struct tdb_context *tdb)
842 {       
843         const struct tdb_methods *methods;
844         tdb_off_t magic_offset = 0;
845         uint32_t zero = 0;
846         int i;
847
848         if (tdb->transaction == NULL) {
849                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
850                 return -1;
851         }
852
853         if (tdb->transaction->transaction_error) {
854                 tdb->ecode = TDB_ERR_IO;
855                 tdb_transaction_cancel(tdb);
856                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
857                 return -1;
858         }
859
860
861         if (tdb->transaction->nesting != 0) {
862                 tdb->transaction->nesting--;
863                 return 0;
864         }               
865
866         /* check for a null transaction */
867         if (tdb->transaction->blocks == NULL) {
868                 tdb_transaction_cancel(tdb);
869                 return 0;
870         }
871
872         methods = tdb->transaction->io_methods;
873         
874         /* if there are any locks pending then the caller has not
875            nested their locks properly, so fail the transaction */
876         if (tdb->num_locks || tdb->global_lock.count) {
877                 tdb->ecode = TDB_ERR_LOCK;
878                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
879                 tdb_transaction_cancel(tdb);
880                 return -1;
881         }
882
883         /* upgrade the main transaction lock region to a write lock */
884         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
885                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
886                 tdb->ecode = TDB_ERR_LOCK;
887                 tdb_transaction_cancel(tdb);
888                 return -1;
889         }
890
891         /* get the global lock - this prevents new users attaching to the database
892            during the commit */
893         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
894                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
895                 tdb->ecode = TDB_ERR_LOCK;
896                 tdb_transaction_cancel(tdb);
897                 return -1;
898         }
899
900         if (!(tdb->flags & TDB_NOSYNC)) {
901                 /* write the recovery data to the end of the file */
902                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
903                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
904                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
905                         tdb_transaction_cancel(tdb);
906                         return -1;
907                 }
908         }
909
910         /* expand the file to the new size if needed */
911         if (tdb->map_size != tdb->transaction->old_map_size) {
912                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
913                                              tdb->map_size - 
914                                              tdb->transaction->old_map_size) == -1) {
915                         tdb->ecode = TDB_ERR_IO;
916                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
917                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
918                         tdb_transaction_cancel(tdb);
919                         return -1;
920                 }
921                 tdb->map_size = tdb->transaction->old_map_size;
922                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
923         }
924
925         /* perform all the writes */
926         for (i=0;i<tdb->transaction->num_blocks;i++) {
927                 tdb_off_t offset;
928                 tdb_len_t length;
929
930                 if (tdb->transaction->blocks[i] == NULL) {
931                         continue;
932                 }
933
934                 offset = i * tdb->transaction->block_size;
935                 length = tdb->transaction->block_size;
936                 if (i == tdb->transaction->num_blocks-1) {
937                         length = tdb->transaction->last_block_size;
938                 }
939
940                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
941                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
942                         
943                         /* we've overwritten part of the data and
944                            possibly expanded the file, so we need to
945                            run the crash recovery code */
946                         tdb->methods = methods;
947                         tdb_transaction_recover(tdb); 
948
949                         tdb_transaction_cancel(tdb);
950                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
951
952                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
953                         return -1;
954                 }
955                 SAFE_FREE(tdb->transaction->blocks[i]);
956         } 
957
958         SAFE_FREE(tdb->transaction->blocks);
959         tdb->transaction->num_blocks = 0;
960
961         if (!(tdb->flags & TDB_NOSYNC)) {
962                 /* ensure the new data is on disk */
963                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
964                         return -1;
965                 }
966
967                 /* remove the recovery marker */
968                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
969                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
970                         return -1;
971                 }
972
973                 /* ensure the recovery marker has been removed on disk */
974                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
975                         return -1;
976                 }
977         }
978
979         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
980
981         /*
982           TODO: maybe write to some dummy hdr field, or write to magic
983           offset without mmap, before the last sync, instead of the
984           utime() call
985         */
986
987         /* on some systems (like Linux 2.6.x) changes via mmap/msync
988            don't change the mtime of the file, this means the file may
989            not be backed up (as tdb rounding to block sizes means that
990            file size changes are quite rare too). The following forces
991            mtime changes when a transaction completes */
992 #ifdef HAVE_UTIME
993         utime(tdb->name, NULL);
994 #endif
995
996         /* use a transaction cancel to free memory and remove the
997            transaction locks */
998         tdb_transaction_cancel(tdb);
999
1000         return 0;
1001 }
1002
1003
1004 /*
1005   recover from an aborted transaction. Must be called with exclusive
1006   database write access already established (including the global
1007   lock to prevent new processes attaching)
1008 */
1009 int tdb_transaction_recover(struct tdb_context *tdb)
1010 {
1011         tdb_off_t recovery_head, recovery_eof;
1012         unsigned char *data, *p;
1013         uint32_t zero = 0;
1014         struct list_struct rec;
1015
1016         /* find the recovery area */
1017         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1018                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1019                 tdb->ecode = TDB_ERR_IO;
1020                 return -1;
1021         }
1022
1023         if (recovery_head == 0) {
1024                 /* we have never allocated a recovery record */
1025                 return 0;
1026         }
1027
1028         /* read the recovery record */
1029         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1030                                    sizeof(rec), DOCONV()) == -1) {
1031                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1032                 tdb->ecode = TDB_ERR_IO;
1033                 return -1;
1034         }
1035
1036         if (rec.magic != TDB_RECOVERY_MAGIC) {
1037                 /* there is no valid recovery data */
1038                 return 0;
1039         }
1040
1041         if (tdb->read_only) {
1042                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1043                 tdb->ecode = TDB_ERR_CORRUPT;
1044                 return -1;
1045         }
1046
1047         recovery_eof = rec.key_len;
1048
1049         data = (unsigned char *)malloc(rec.data_len);
1050         if (data == NULL) {
1051                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1052                 tdb->ecode = TDB_ERR_OOM;
1053                 return -1;
1054         }
1055
1056         /* read the full recovery data */
1057         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1058                                    rec.data_len, 0) == -1) {
1059                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1060                 tdb->ecode = TDB_ERR_IO;
1061                 return -1;
1062         }
1063
1064         /* recover the file data */
1065         p = data;
1066         while (p+8 < data + rec.data_len) {
1067                 uint32_t ofs, len;
1068                 if (DOCONV()) {
1069                         tdb_convert(p, 8);
1070                 }
1071                 memcpy(&ofs, p, 4);
1072                 memcpy(&len, p+4, 4);
1073
1074                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1075                         free(data);
1076                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1077                         tdb->ecode = TDB_ERR_IO;
1078                         return -1;
1079                 }
1080                 p += 8 + len;
1081         }
1082
1083         free(data);
1084
1085         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1086                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1087                 tdb->ecode = TDB_ERR_IO;
1088                 return -1;
1089         }
1090
1091         /* if the recovery area is after the recovered eof then remove it */
1092         if (recovery_eof <= recovery_head) {
1093                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1094                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1095                         tdb->ecode = TDB_ERR_IO;
1096                         return -1;                      
1097                 }
1098         }
1099
1100         /* remove the recovery magic */
1101         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1102                           &zero) == -1) {
1103                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1104                 tdb->ecode = TDB_ERR_IO;
1105                 return -1;                      
1106         }
1107         
1108         /* reduce the file size to the old size */
1109         tdb_munmap(tdb);
1110         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1111                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1112                 tdb->ecode = TDB_ERR_IO;
1113                 return -1;                      
1114         }
1115         tdb->map_size = recovery_eof;
1116         tdb_mmap(tdb);
1117
1118         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1119                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1120                 tdb->ecode = TDB_ERR_IO;
1121                 return -1;
1122         }
1123
1124         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1125                  recovery_eof));
1126
1127         /* all done */
1128         return 0;
1129 }