]> git.ozlabs.org Git - ccan/blob - ccan/tdb/transaction.c
Import from SAMBA's tdb:
[ccan] / ccan / tdb / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_NO_NESTING is passed to flags in tdb open then transaction
89     nesting is disabled. tdb_transaction_start() will then implicitely
90     cancel any pending transactions and always start a new transaction
91     context instead of nesting.
92
93 */
94
95
96 /*
97   hold the context of any current transaction
98 */
99 struct tdb_transaction {
100         /* we keep a mirrored copy of the tdb hash heads here so
101            tdb_next_hash_chain() can operate efficiently */
102         uint32_t *hash_heads;
103
104         /* the original io methods - used to do IOs to the real db */
105         const struct tdb_methods *io_methods;
106
107         /* the list of transaction blocks. When a block is first
108            written to, it gets created in this list */
109         uint8_t **blocks;
110         uint32_t num_blocks;
111         uint32_t block_size;      /* bytes in each block */
112         uint32_t last_block_size; /* number of valid bytes in the last block */
113
114         /* non-zero when an internal transaction error has
115            occurred. All write operations will then fail until the
116            transaction is ended */
117         int transaction_error;
118
119         /* when inside a transaction we need to keep track of any
120            nested tdb_transaction_start() calls, as these are allowed,
121            but don't create a new transaction */
122         int nesting;
123
124         /* set when a prepare has already occurred */
125         bool prepared;
126         tdb_off_t magic_offset;
127
128         /* old file size before transaction */
129         tdb_len_t old_map_size;
130 };
131
132
133 /*
134   read while in a transaction. We need to check first if the data is in our list
135   of transaction elements, then if not do a real read
136 */
137 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
138                             tdb_len_t len, int cv)
139 {
140         uint32_t blk;
141
142         /* Only a commit is allowed on a prepared transaction */
143         if (tdb->transaction->prepared) {
144                 tdb->ecode = TDB_ERR_EINVAL;
145                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: transaction already prepared, read not allowed\n"));
146                 tdb->transaction->transaction_error = 1;
147                 return -1;
148         }
149
150         /* break it down into block sized ops */
151         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
152                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
153                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
154                         return -1;
155                 }
156                 len -= len2;
157                 off += len2;
158                 buf = (void *)(len2 + (char *)buf);
159         }
160
161         if (len == 0) {
162                 return 0;
163         }
164
165         blk = off / tdb->transaction->block_size;
166
167         /* see if we have it in the block list */
168         if (tdb->transaction->num_blocks <= blk ||
169             tdb->transaction->blocks[blk] == NULL) {
170                 /* nope, do a real read */
171                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
172                         goto fail;
173                 }
174                 return 0;
175         }
176
177         /* it is in the block list. Now check for the last block */
178         if (blk == tdb->transaction->num_blocks-1) {
179                 if (len > tdb->transaction->last_block_size) {
180                         goto fail;
181                 }
182         }
183         
184         /* now copy it out of this block */
185         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
186         if (cv) {
187                 tdb_convert(buf, len);
188         }
189         return 0;
190
191 fail:
192         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
193         tdb->ecode = TDB_ERR_IO;
194         tdb->transaction->transaction_error = 1;
195         return -1;
196 }
197
198
199 /*
200   write while in a transaction
201 */
202 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
203                              const void *buf, tdb_len_t len)
204 {
205         uint32_t blk;
206
207         /* Only a commit is allowed on a prepared transaction */
208         if (tdb->transaction->prepared) {
209                 tdb->ecode = TDB_ERR_EINVAL;
210                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
211                 tdb->transaction->transaction_error = 1;
212                 return -1;
213         }
214
215         /* if the write is to a hash head, then update the transaction
216            hash heads */
217         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
218             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
219                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
220                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
221         }
222
223         /* break it up into block sized chunks */
224         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
225                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
226                 if (transaction_write(tdb, off, buf, len2) != 0) {
227                         return -1;
228                 }
229                 len -= len2;
230                 off += len2;
231                 if (buf != NULL) {
232                         buf = (const void *)(len2 + (const char *)buf);
233                 }
234         }
235
236         if (len == 0) {
237                 return 0;
238         }
239
240         blk = off / tdb->transaction->block_size;
241         off = off % tdb->transaction->block_size;
242
243         if (tdb->transaction->num_blocks <= blk) {
244                 uint8_t **new_blocks;
245                 /* expand the blocks array */
246                 if (tdb->transaction->blocks == NULL) {
247                         new_blocks = (uint8_t **)malloc(
248                                 (blk+1)*sizeof(uint8_t *));
249                 } else {
250                         new_blocks = (uint8_t **)realloc(
251                                 tdb->transaction->blocks,
252                                 (blk+1)*sizeof(uint8_t *));
253                 }
254                 if (new_blocks == NULL) {
255                         tdb->ecode = TDB_ERR_OOM;
256                         goto fail;
257                 }
258                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
259                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
260                 tdb->transaction->blocks = new_blocks;
261                 tdb->transaction->num_blocks = blk+1;
262                 tdb->transaction->last_block_size = 0;
263         }
264
265         /* allocate and fill a block? */
266         if (tdb->transaction->blocks[blk] == NULL) {
267                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
268                 if (tdb->transaction->blocks[blk] == NULL) {
269                         tdb->ecode = TDB_ERR_OOM;
270                         tdb->transaction->transaction_error = 1;
271                         return -1;                      
272                 }
273                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
274                         tdb_len_t len2 = tdb->transaction->block_size;
275                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
276                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
277                         }
278                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
279                                                                    tdb->transaction->blocks[blk], 
280                                                                    len2, 0) != 0) {
281                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
282                                 tdb->ecode = TDB_ERR_IO;
283                                 goto fail;
284                         }
285                         if (blk == tdb->transaction->num_blocks-1) {
286                                 tdb->transaction->last_block_size = len2;
287                         }                       
288                 }
289         }
290         
291         /* overwrite part of an existing block */
292         if (buf == NULL) {
293                 memset(tdb->transaction->blocks[blk] + off, 0, len);
294         } else {
295                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
296         }
297         if (blk == tdb->transaction->num_blocks-1) {
298                 if (len + off > tdb->transaction->last_block_size) {
299                         tdb->transaction->last_block_size = len + off;
300                 }
301         }
302
303         return 0;
304
305 fail:
306         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
307                  (blk*tdb->transaction->block_size) + off, len));
308         tdb->transaction->transaction_error = 1;
309         return -1;
310 }
311
312
313 /*
314   write while in a transaction - this varient never expands the transaction blocks, it only
315   updates existing blocks. This means it cannot change the recovery size
316 */
317 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
318                                       const void *buf, tdb_len_t len)
319 {
320         uint32_t blk;
321
322         /* break it up into block sized chunks */
323         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
324                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
325                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
326                         return -1;
327                 }
328                 len -= len2;
329                 off += len2;
330                 if (buf != NULL) {
331                         buf = (const void *)(len2 + (const char *)buf);
332                 }
333         }
334
335         if (len == 0) {
336                 return 0;
337         }
338
339         blk = off / tdb->transaction->block_size;
340         off = off % tdb->transaction->block_size;
341
342         if (tdb->transaction->num_blocks <= blk ||
343             tdb->transaction->blocks[blk] == NULL) {
344                 return 0;
345         }
346
347         if (blk == tdb->transaction->num_blocks-1 &&
348             off + len > tdb->transaction->last_block_size) {
349                 if (off >= tdb->transaction->last_block_size) {
350                         return 0;
351                 }
352                 len = tdb->transaction->last_block_size - off;
353         }
354
355         /* overwrite part of an existing block */
356         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
357
358         return 0;
359 }
360
361
362 /*
363   accelerated hash chain head search, using the cached hash heads
364 */
365 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
366 {
367         uint32_t h = *chain;
368         for (;h < tdb->header.hash_size;h++) {
369                 /* the +1 takes account of the freelist */
370                 if (0 != tdb->transaction->hash_heads[h+1]) {
371                         break;
372                 }
373         }
374         (*chain) = h;
375 }
376
377 /*
378   out of bounds check during a transaction
379 */
380 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
381 {
382         if (len <= tdb->map_size) {
383                 return 0;
384         }
385         return TDB_ERRCODE(TDB_ERR_IO, -1);
386 }
387
388 /*
389   transaction version of tdb_expand().
390 */
391 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
392                                    tdb_off_t addition)
393 {
394         /* add a write to the transaction elements, so subsequent
395            reads see the zero data */
396         if (transaction_write(tdb, size, NULL, addition) != 0) {
397                 return -1;
398         }
399
400         return 0;
401 }
402
403 /*
404   brlock during a transaction - ignore them
405 */
406 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
407                               int rw_type, int lck_type, int probe, size_t len)
408 {
409         return 0;
410 }
411
412 static const struct tdb_methods transaction_methods = {
413         transaction_read,
414         transaction_write,
415         transaction_next_hash_chain,
416         transaction_oob,
417         transaction_expand_file,
418         transaction_brlock
419 };
420
421 /*
422   sync to disk
423 */
424 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
425 {       
426         if (fsync(tdb->fd) != 0) {
427                 tdb->ecode = TDB_ERR_IO;
428                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
429                 return -1;
430         }
431 #ifdef MS_SYNC
432         if (tdb->map_ptr) {
433                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
434                 if (msync(moffset + (char *)tdb->map_ptr, 
435                           length + (offset - moffset), MS_SYNC) != 0) {
436                         tdb->ecode = TDB_ERR_IO;
437                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
438                                  strerror(errno)));
439                         return -1;
440                 }
441         }
442 #endif
443         return 0;
444 }
445
446 int tdb_transaction_cancel_internal(struct tdb_context *tdb)
447 {
448         int i, ret = 0;
449
450         if (tdb->transaction == NULL) {
451                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
452                 return -1;
453         }
454
455         if (tdb->transaction->nesting != 0) {
456                 tdb->transaction->transaction_error = 1;
457                 tdb->transaction->nesting--;
458                 return 0;
459         }               
460
461         tdb->map_size = tdb->transaction->old_map_size;
462
463         /* free all the transaction blocks */
464         for (i=0;i<tdb->transaction->num_blocks;i++) {
465                 if (tdb->transaction->blocks[i] != NULL) {
466                         free(tdb->transaction->blocks[i]);
467                 }
468         }
469         SAFE_FREE(tdb->transaction->blocks);
470
471         if (tdb->transaction->magic_offset) {
472                 const struct tdb_methods *methods = tdb->transaction->io_methods;
473                 uint32_t zero = 0;
474
475                 /* remove the recovery marker */
476                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
477                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
478                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
479                         ret = -1;
480                 }
481         }
482
483         /* remove any global lock created during the transaction */
484         if (tdb->global_lock.count != 0) {
485                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
486                 tdb->global_lock.count = 0;
487         }
488
489         /* remove any locks created during the transaction */
490         if (tdb->num_locks != 0) {
491                 for (i=0;i<tdb->num_lockrecs;i++) {
492                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
493                                    F_UNLCK,F_SETLKW, 0, 1);
494                 }
495                 tdb->num_locks = 0;
496                 tdb->num_lockrecs = 0;
497                 SAFE_FREE(tdb->lockrecs);
498         }
499
500         /* restore the normal io methods */
501         tdb->methods = tdb->transaction->io_methods;
502
503         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
504         tdb_transaction_unlock(tdb);
505         SAFE_FREE(tdb->transaction->hash_heads);
506         SAFE_FREE(tdb->transaction);
507         
508         return ret;
509 }
510
511 /*
512   start a tdb transaction. No token is returned, as only a single
513   transaction is allowed to be pending per tdb_context
514 */
515 int tdb_transaction_start(struct tdb_context *tdb)
516 {
517         /* some sanity checks */
518         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
519                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
520                 tdb->ecode = TDB_ERR_EINVAL;
521                 return -1;
522         }
523
524         /* cope with nested tdb_transaction_start() calls */
525         if (tdb->transaction != NULL) {
526                 tdb_trace(tdb, "tdb_transaction_start");
527                 if (!tdb->flags & TDB_NO_NESTING) {
528                         tdb->transaction->nesting++;
529                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
530                                  tdb->transaction->nesting));
531                         return 0;
532                 } else {
533                         tdb_transaction_cancel_internal(tdb);
534                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
535                 }
536         }
537
538         if (tdb->num_locks != 0 || tdb->global_lock.count) {
539                 /* the caller must not have any locks when starting a
540                    transaction as otherwise we'll be screwed by lack
541                    of nested locks in posix */
542                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
543                 tdb->ecode = TDB_ERR_LOCK;
544                 return -1;
545         }
546
547         if (tdb->travlocks.next != NULL) {
548                 /* you cannot use transactions inside a traverse (although you can use
549                    traverse inside a transaction) as otherwise you can end up with
550                    deadlock */
551                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
552                 tdb->ecode = TDB_ERR_LOCK;
553                 return -1;
554         }
555
556         tdb->transaction = (struct tdb_transaction *)
557                 calloc(sizeof(struct tdb_transaction), 1);
558         if (tdb->transaction == NULL) {
559                 tdb->ecode = TDB_ERR_OOM;
560                 return -1;
561         }
562
563         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
564         tdb->transaction->block_size = tdb->page_size;
565
566         /* get the transaction write lock. This is a blocking lock. As
567            discussed with Volker, there are a number of ways we could
568            make this async, which we will probably do in the future */
569         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
570                 SAFE_FREE(tdb->transaction->blocks);
571                 SAFE_FREE(tdb->transaction);
572                 return -1;
573         }
574         
575         /* get a read lock from the freelist to the end of file. This
576            is upgraded to a write lock during the commit */
577         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
578                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
579                 tdb->ecode = TDB_ERR_LOCK;
580                 goto fail;
581         }
582
583         /* setup a copy of the hash table heads so the hash scan in
584            traverse can be fast */
585         tdb->transaction->hash_heads = (uint32_t *)
586                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
587         if (tdb->transaction->hash_heads == NULL) {
588                 tdb->ecode = TDB_ERR_OOM;
589                 goto fail;
590         }
591         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
592                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
593                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
594                 tdb->ecode = TDB_ERR_IO;
595                 goto fail;
596         }
597
598         /* make sure we know about any file expansions already done by
599            anyone else */
600         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
601         tdb->transaction->old_map_size = tdb->map_size;
602
603         /* finally hook the io methods, replacing them with
604            transaction specific methods */
605         tdb->transaction->io_methods = tdb->methods;
606         tdb->methods = &transaction_methods;
607
608         /* Trace at the end, so we get sequence number correct. */
609         tdb_trace(tdb, "tdb_transaction_start");
610         return 0;
611         
612 fail:
613         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
614         tdb_transaction_unlock(tdb);
615         SAFE_FREE(tdb->transaction->blocks);
616         SAFE_FREE(tdb->transaction->hash_heads);
617         SAFE_FREE(tdb->transaction);
618         return -1;
619 }
620
621
622 /*
623   cancel the current transaction
624 */
625 int tdb_transaction_cancel(struct tdb_context *tdb)
626 {       
627         tdb_trace(tdb, "tdb_transaction_cancel");
628         return tdb_transaction_cancel_internal(tdb);
629 }
630
631 /*
632   work out how much space the linearised recovery data will consume
633 */
634 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
635 {
636         tdb_len_t recovery_size = 0;
637         int i;
638
639         recovery_size = sizeof(uint32_t);
640         for (i=0;i<tdb->transaction->num_blocks;i++) {
641                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
642                         break;
643                 }
644                 if (tdb->transaction->blocks[i] == NULL) {
645                         continue;
646                 }
647                 recovery_size += 2*sizeof(tdb_off_t);
648                 if (i == tdb->transaction->num_blocks-1) {
649                         recovery_size += tdb->transaction->last_block_size;
650                 } else {
651                         recovery_size += tdb->transaction->block_size;
652                 }
653         }       
654
655         return recovery_size;
656 }
657
658 /*
659   allocate the recovery area, or use an existing recovery area if it is
660   large enough
661 */
662 static int tdb_recovery_allocate(struct tdb_context *tdb, 
663                                  tdb_len_t *recovery_size,
664                                  tdb_off_t *recovery_offset,
665                                  tdb_len_t *recovery_max_size)
666 {
667         struct list_struct rec;
668         const struct tdb_methods *methods = tdb->transaction->io_methods;
669         tdb_off_t recovery_head;
670
671         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
672                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
673                 return -1;
674         }
675
676         rec.rec_len = 0;
677
678         if (recovery_head != 0 && 
679             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
680                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
681                 return -1;
682         }
683
684         *recovery_size = tdb_recovery_size(tdb);
685
686         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
687                 /* it fits in the existing area */
688                 *recovery_max_size = rec.rec_len;
689                 *recovery_offset = recovery_head;
690                 return 0;
691         }
692
693         /* we need to free up the old recovery area, then allocate a
694            new one at the end of the file. Note that we cannot use
695            tdb_allocate() to allocate the new one as that might return
696            us an area that is being currently used (as of the start of
697            the transaction) */
698         if (recovery_head != 0) {
699                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
700                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
701                         return -1;
702                 }
703         }
704
705         /* the tdb_free() call might have increased the recovery size */
706         *recovery_size = tdb_recovery_size(tdb);
707
708         /* round up to a multiple of page size */
709         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
710         *recovery_offset = tdb->map_size;
711         recovery_head = *recovery_offset;
712
713         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
714                                      (tdb->map_size - tdb->transaction->old_map_size) +
715                                      sizeof(rec) + *recovery_max_size) == -1) {
716                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
717                 return -1;
718         }
719
720         /* remap the file (if using mmap) */
721         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
722
723         /* we have to reset the old map size so that we don't try to expand the file
724            again in the transaction commit, which would destroy the recovery area */
725         tdb->transaction->old_map_size = tdb->map_size;
726
727         /* write the recovery header offset and sync - we can sync without a race here
728            as the magic ptr in the recovery record has not been set */
729         CONVERT(recovery_head);
730         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
731                                &recovery_head, sizeof(tdb_off_t)) == -1) {
732                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
733                 return -1;
734         }
735         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
736                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
737                 return -1;
738         }
739
740         return 0;
741 }
742
743
744 /*
745   setup the recovery data that will be used on a crash during commit
746 */
747 static int transaction_setup_recovery(struct tdb_context *tdb, 
748                                       tdb_off_t *magic_offset)
749 {
750         tdb_len_t recovery_size;
751         unsigned char *data, *p;
752         const struct tdb_methods *methods = tdb->transaction->io_methods;
753         struct list_struct *rec;
754         tdb_off_t recovery_offset, recovery_max_size;
755         tdb_off_t old_map_size = tdb->transaction->old_map_size;
756         uint32_t magic, tailer;
757         int i;
758
759         /*
760           check that the recovery area has enough space
761         */
762         if (tdb_recovery_allocate(tdb, &recovery_size, 
763                                   &recovery_offset, &recovery_max_size) == -1) {
764                 return -1;
765         }
766
767         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
768         if (data == NULL) {
769                 tdb->ecode = TDB_ERR_OOM;
770                 return -1;
771         }
772
773         rec = (struct list_struct *)data;
774         memset(rec, 0, sizeof(*rec));
775
776         rec->magic    = 0;
777         rec->data_len = recovery_size;
778         rec->rec_len  = recovery_max_size;
779         rec->key_len  = old_map_size;
780         CONVERT(rec);
781
782         /* build the recovery data into a single blob to allow us to do a single
783            large write, which should be more efficient */
784         p = data + sizeof(*rec);
785         for (i=0;i<tdb->transaction->num_blocks;i++) {
786                 tdb_off_t offset;
787                 tdb_len_t length;
788
789                 if (tdb->transaction->blocks[i] == NULL) {
790                         continue;
791                 }
792
793                 offset = i * tdb->transaction->block_size;
794                 length = tdb->transaction->block_size;
795                 if (i == tdb->transaction->num_blocks-1) {
796                         length = tdb->transaction->last_block_size;
797                 }
798                 
799                 if (offset >= old_map_size) {
800                         continue;
801                 }
802                 if (offset + length > tdb->transaction->old_map_size) {
803                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
804                         free(data);
805                         tdb->ecode = TDB_ERR_CORRUPT;
806                         return -1;
807                 }
808                 memcpy(p, &offset, 4);
809                 memcpy(p+4, &length, 4);
810                 if (DOCONV()) {
811                         tdb_convert(p, 8);
812                 }
813                 /* the recovery area contains the old data, not the
814                    new data, so we have to call the original tdb_read
815                    method to get it */
816                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
817                         free(data);
818                         tdb->ecode = TDB_ERR_IO;
819                         return -1;
820                 }
821                 p += 8 + length;
822         }
823
824         /* and the tailer */
825         tailer = sizeof(*rec) + recovery_max_size;
826         memcpy(p, &tailer, 4);
827         CONVERT(p);
828
829         /* write the recovery data to the recovery area */
830         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
831                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
832                 free(data);
833                 tdb->ecode = TDB_ERR_IO;
834                 return -1;
835         }
836         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
837                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
838                 free(data);
839                 tdb->ecode = TDB_ERR_IO;
840                 return -1;
841         }
842
843         /* as we don't have ordered writes, we have to sync the recovery
844            data before we update the magic to indicate that the recovery
845            data is present */
846         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
847                 free(data);
848                 return -1;
849         }
850
851         free(data);
852
853         magic = TDB_RECOVERY_MAGIC;
854         CONVERT(magic);
855
856         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
857
858         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
859                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
860                 tdb->ecode = TDB_ERR_IO;
861                 return -1;
862         }
863         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
864                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
865                 tdb->ecode = TDB_ERR_IO;
866                 return -1;
867         }
868
869         /* ensure the recovery magic marker is on disk */
870         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
871                 return -1;
872         }
873
874         return 0;
875 }
876
877 static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
878 {       
879         const struct tdb_methods *methods;
880
881         if (tdb->transaction == NULL) {
882                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
883                 return -1;
884         }
885
886         if (tdb->transaction->prepared) {
887                 tdb->ecode = TDB_ERR_EINVAL;
888                 tdb_transaction_cancel(tdb);
889                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
890                 return -1;
891         }
892
893         if (tdb->transaction->transaction_error) {
894                 tdb->ecode = TDB_ERR_IO;
895                 tdb_transaction_cancel_internal(tdb);
896                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
897                 return -1;
898         }
899
900
901         if (tdb->transaction->nesting != 0) {
902                 tdb->transaction->nesting--;
903                 return 0;
904         }               
905
906 #ifdef TDB_TRACE
907         /* store seqnum now, before reading becomes illegal. */
908         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &tdb->transaction_prepare_seqnum);
909 #endif
910
911         /* check for a null transaction */
912         if (tdb->transaction->blocks == NULL) {
913                 return 0;
914         }
915
916         methods = tdb->transaction->io_methods;
917         
918         /* if there are any locks pending then the caller has not
919            nested their locks properly, so fail the transaction */
920         if (tdb->num_locks || tdb->global_lock.count) {
921                 tdb->ecode = TDB_ERR_LOCK;
922                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
923                 tdb_transaction_cancel_internal(tdb);
924                 return -1;
925         }
926
927         /* upgrade the main transaction lock region to a write lock */
928         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
929                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
930                 tdb->ecode = TDB_ERR_LOCK;
931                 tdb_transaction_cancel_internal(tdb);
932                 return -1;
933         }
934
935         /* get the global lock - this prevents new users attaching to the database
936            during the commit */
937         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
938                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
939                 tdb->ecode = TDB_ERR_LOCK;
940                 tdb_transaction_cancel_internal(tdb);
941                 return -1;
942         }
943
944         if (!(tdb->flags & TDB_NOSYNC)) {
945                 /* write the recovery data to the end of the file */
946                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
947                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
948                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
949                         tdb_transaction_cancel_internal(tdb);
950                         return -1;
951                 }
952         }
953
954         tdb->transaction->prepared = true;
955
956         /* expand the file to the new size if needed */
957         if (tdb->map_size != tdb->transaction->old_map_size) {
958                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
959                                              tdb->map_size - 
960                                              tdb->transaction->old_map_size) == -1) {
961                         tdb->ecode = TDB_ERR_IO;
962                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
963                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
964                         tdb_transaction_cancel_internal(tdb);
965                         return -1;
966                 }
967                 tdb->map_size = tdb->transaction->old_map_size;
968                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
969         }
970
971         /* Keep the global lock until the actual commit */
972
973         return 0;
974 }
975
976 /*
977    prepare to commit the current transaction
978 */
979 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
980 {       
981         tdb_trace(tdb, "tdb_transaction_prepare_commit");
982         return tdb_transaction_prepare_commit_internal(tdb);
983 }
984
985 /*
986   commit the current transaction
987 */
988 int tdb_transaction_commit(struct tdb_context *tdb)
989 {       
990         const struct tdb_methods *methods;
991         int i;
992
993         if (tdb->transaction == NULL) {
994                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
995                 return -1;
996         }
997
998         /* If we've prepared, can't read seqnum. */
999         if (tdb->transaction->prepared) {
1000                 tdb_trace_seqnum(tdb, tdb->transaction_prepare_seqnum,
1001                                  "tdb_transaction_commit");
1002         } else {
1003                 tdb_trace(tdb, "tdb_transaction_commit");
1004         }
1005
1006         if (tdb->transaction->transaction_error) {
1007                 tdb->ecode = TDB_ERR_IO;
1008                 tdb_transaction_cancel(tdb);
1009                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1010                 return -1;
1011         }
1012
1013
1014         if (tdb->transaction->nesting != 0) {
1015                 tdb->transaction->nesting--;
1016                 return 0;
1017         }
1018
1019         /* check for a null transaction */
1020         if (tdb->transaction->blocks == NULL) {
1021                 tdb_transaction_cancel_internal(tdb);
1022                 return 0;
1023         }
1024
1025         if (!tdb->transaction->prepared) {
1026                 int ret = tdb_transaction_prepare_commit_internal(tdb);
1027                 if (ret)
1028                         return ret;
1029         }
1030
1031         methods = tdb->transaction->io_methods;
1032
1033         /* perform all the writes */
1034         for (i=0;i<tdb->transaction->num_blocks;i++) {
1035                 tdb_off_t offset;
1036                 tdb_len_t length;
1037
1038                 if (tdb->transaction->blocks[i] == NULL) {
1039                         continue;
1040                 }
1041
1042                 offset = i * tdb->transaction->block_size;
1043                 length = tdb->transaction->block_size;
1044                 if (i == tdb->transaction->num_blocks-1) {
1045                         length = tdb->transaction->last_block_size;
1046                 }
1047
1048                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1049                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1050                         
1051                         /* we've overwritten part of the data and
1052                            possibly expanded the file, so we need to
1053                            run the crash recovery code */
1054                         tdb->methods = methods;
1055                         tdb_transaction_recover(tdb); 
1056
1057                         tdb_transaction_cancel_internal(tdb);
1058                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1059
1060                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1061                         return -1;
1062                 }
1063                 SAFE_FREE(tdb->transaction->blocks[i]);
1064         } 
1065
1066         SAFE_FREE(tdb->transaction->blocks);
1067         tdb->transaction->num_blocks = 0;
1068
1069         if (!(tdb->flags & TDB_NOSYNC)) {
1070                 /* ensure the new data is on disk */
1071                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1072                         return -1;
1073                 }
1074         }
1075
1076         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1077
1078         /*
1079           TODO: maybe write to some dummy hdr field, or write to magic
1080           offset without mmap, before the last sync, instead of the
1081           utime() call
1082         */
1083
1084         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1085            don't change the mtime of the file, this means the file may
1086            not be backed up (as tdb rounding to block sizes means that
1087            file size changes are quite rare too). The following forces
1088            mtime changes when a transaction completes */
1089 #if HAVE_UTIME
1090         utime(tdb->name, NULL);
1091 #endif
1092
1093         /* use a transaction cancel to free memory and remove the
1094            transaction locks */
1095         tdb_transaction_cancel_internal(tdb);
1096
1097         return 0;
1098 }
1099
1100
1101 /*
1102   recover from an aborted transaction. Must be called with exclusive
1103   database write access already established (including the global
1104   lock to prevent new processes attaching)
1105 */
1106 int tdb_transaction_recover(struct tdb_context *tdb)
1107 {
1108         tdb_off_t recovery_head, recovery_eof;
1109         unsigned char *data, *p;
1110         uint32_t zero = 0;
1111         struct list_struct rec;
1112
1113         /* find the recovery area */
1114         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1115                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1116                 tdb->ecode = TDB_ERR_IO;
1117                 return -1;
1118         }
1119
1120         if (recovery_head == 0) {
1121                 /* we have never allocated a recovery record */
1122                 return 0;
1123         }
1124
1125         /* read the recovery record */
1126         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1127                                    sizeof(rec), DOCONV()) == -1) {
1128                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1129                 tdb->ecode = TDB_ERR_IO;
1130                 return -1;
1131         }
1132
1133         if (rec.magic != TDB_RECOVERY_MAGIC) {
1134                 /* there is no valid recovery data */
1135                 return 0;
1136         }
1137
1138         if (tdb->read_only) {
1139                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1140                 tdb->ecode = TDB_ERR_CORRUPT;
1141                 return -1;
1142         }
1143
1144         recovery_eof = rec.key_len;
1145
1146         data = (unsigned char *)malloc(rec.data_len);
1147         if (data == NULL) {
1148                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1149                 tdb->ecode = TDB_ERR_OOM;
1150                 return -1;
1151         }
1152
1153         /* read the full recovery data */
1154         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1155                                    rec.data_len, 0) == -1) {
1156                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1157                 tdb->ecode = TDB_ERR_IO;
1158                 return -1;
1159         }
1160
1161         /* recover the file data */
1162         p = data;
1163         while (p+8 < data + rec.data_len) {
1164                 uint32_t ofs, len;
1165                 if (DOCONV()) {
1166                         tdb_convert(p, 8);
1167                 }
1168                 memcpy(&ofs, p, 4);
1169                 memcpy(&len, p+4, 4);
1170
1171                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1172                         free(data);
1173                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1174                         tdb->ecode = TDB_ERR_IO;
1175                         return -1;
1176                 }
1177                 p += 8 + len;
1178         }
1179
1180         free(data);
1181
1182         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1183                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1184                 tdb->ecode = TDB_ERR_IO;
1185                 return -1;
1186         }
1187
1188         /* if the recovery area is after the recovered eof then remove it */
1189         if (recovery_eof <= recovery_head) {
1190                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1191                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1192                         tdb->ecode = TDB_ERR_IO;
1193                         return -1;                      
1194                 }
1195         }
1196
1197         /* remove the recovery magic */
1198         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1199                           &zero) == -1) {
1200                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1201                 tdb->ecode = TDB_ERR_IO;
1202                 return -1;                      
1203         }
1204         
1205         /* reduce the file size to the old size */
1206         tdb_munmap(tdb);
1207         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1208                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1209                 tdb->ecode = TDB_ERR_IO;
1210                 return -1;                      
1211         }
1212         tdb->map_size = recovery_eof;
1213         tdb_mmap(tdb);
1214
1215         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1216                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1217                 tdb->ecode = TDB_ERR_IO;
1218                 return -1;
1219         }
1220
1221         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1222                  recovery_eof));
1223
1224         /* all done */
1225         return 0;
1226 }