]> git.ozlabs.org Git - ccan/blob - ccan/tdb/transaction.c
6909c261a2808ca50c33cb413a5597cde8d4cbd5
[ccan] / ccan / tdb / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_NO_NESTING is passed to flags in tdb open then transaction
89     nesting is disabled. tdb_transaction_start() will then implicitely
90     cancel any pending transactions and always start a new transaction
91     context instead of nesting.
92
93 */
94
95
96 /*
97   hold the context of any current transaction
98 */
99 struct tdb_transaction {
100         /* we keep a mirrored copy of the tdb hash heads here so
101            tdb_next_hash_chain() can operate efficiently */
102         uint32_t *hash_heads;
103
104         /* the original io methods - used to do IOs to the real db */
105         const struct tdb_methods *io_methods;
106
107         /* the list of transaction blocks. When a block is first
108            written to, it gets created in this list */
109         uint8_t **blocks;
110         uint32_t num_blocks;
111         uint32_t block_size;      /* bytes in each block */
112         uint32_t last_block_size; /* number of valid bytes in the last block */
113
114         /* non-zero when an internal transaction error has
115            occurred. All write operations will then fail until the
116            transaction is ended */
117         int transaction_error;
118
119         /* when inside a transaction we need to keep track of any
120            nested tdb_transaction_start() calls, as these are allowed,
121            but don't create a new transaction */
122         int nesting;
123
124         /* set when a prepare has already occurred */
125         bool prepared;
126         tdb_off_t magic_offset;
127
128         /* old file size before transaction */
129         tdb_len_t old_map_size;
130
131         /* we should re-pack on commit */
132         bool need_repack;
133 };
134
135
136 /*
137   read while in a transaction. We need to check first if the data is in our list
138   of transaction elements, then if not do a real read
139 */
140 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
141                             tdb_len_t len, int cv)
142 {
143         uint32_t blk;
144
145         /* Only a commit is allowed on a prepared transaction */
146         if (tdb->transaction->prepared) {
147                 tdb->ecode = TDB_ERR_EINVAL;
148                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: transaction already prepared, read not allowed\n"));
149                 tdb->transaction->transaction_error = 1;
150                 return -1;
151         }
152
153         /* break it down into block sized ops */
154         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
155                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
156                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
157                         return -1;
158                 }
159                 len -= len2;
160                 off += len2;
161                 buf = (void *)(len2 + (char *)buf);
162         }
163
164         if (len == 0) {
165                 return 0;
166         }
167
168         blk = off / tdb->transaction->block_size;
169
170         /* see if we have it in the block list */
171         if (tdb->transaction->num_blocks <= blk ||
172             tdb->transaction->blocks[blk] == NULL) {
173                 /* nope, do a real read */
174                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
175                         goto fail;
176                 }
177                 return 0;
178         }
179
180         /* it is in the block list. Now check for the last block */
181         if (blk == tdb->transaction->num_blocks-1) {
182                 if (len > tdb->transaction->last_block_size) {
183                         goto fail;
184                 }
185         }
186         
187         /* now copy it out of this block */
188         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
189         if (cv) {
190                 tdb_convert(buf, len);
191         }
192         return 0;
193
194 fail:
195         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
196         tdb->ecode = TDB_ERR_IO;
197         tdb->transaction->transaction_error = 1;
198         return -1;
199 }
200
201
202 /*
203   write while in a transaction
204 */
205 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
206                              const void *buf, tdb_len_t len)
207 {
208         uint32_t blk;
209
210         /* Only a commit is allowed on a prepared transaction */
211         if (tdb->transaction->prepared) {
212                 tdb->ecode = TDB_ERR_EINVAL;
213                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
214                 tdb->transaction->transaction_error = 1;
215                 return -1;
216         }
217
218         /* if the write is to a hash head, then update the transaction
219            hash heads */
220         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
221             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
222                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
223                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
224         }
225
226         /* break it up into block sized chunks */
227         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
228                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
229                 if (transaction_write(tdb, off, buf, len2) != 0) {
230                         return -1;
231                 }
232                 len -= len2;
233                 off += len2;
234                 if (buf != NULL) {
235                         buf = (const void *)(len2 + (const char *)buf);
236                 }
237         }
238
239         if (len == 0) {
240                 return 0;
241         }
242
243         blk = off / tdb->transaction->block_size;
244         off = off % tdb->transaction->block_size;
245
246         if (tdb->transaction->num_blocks <= blk) {
247                 uint8_t **new_blocks;
248                 /* expand the blocks array */
249                 if (tdb->transaction->blocks == NULL) {
250                         new_blocks = (uint8_t **)malloc(
251                                 (blk+1)*sizeof(uint8_t *));
252                 } else {
253                         new_blocks = (uint8_t **)realloc(
254                                 tdb->transaction->blocks,
255                                 (blk+1)*sizeof(uint8_t *));
256                 }
257                 if (new_blocks == NULL) {
258                         tdb->ecode = TDB_ERR_OOM;
259                         goto fail;
260                 }
261                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
262                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
263                 tdb->transaction->blocks = new_blocks;
264                 tdb->transaction->num_blocks = blk+1;
265                 tdb->transaction->last_block_size = 0;
266         }
267
268         /* allocate and fill a block? */
269         if (tdb->transaction->blocks[blk] == NULL) {
270                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
271                 if (tdb->transaction->blocks[blk] == NULL) {
272                         tdb->ecode = TDB_ERR_OOM;
273                         tdb->transaction->transaction_error = 1;
274                         return -1;                      
275                 }
276                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
277                         tdb_len_t len2 = tdb->transaction->block_size;
278                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
279                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
280                         }
281                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
282                                                                    tdb->transaction->blocks[blk], 
283                                                                    len2, 0) != 0) {
284                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
285                                 tdb->ecode = TDB_ERR_IO;
286                                 goto fail;
287                         }
288                         if (blk == tdb->transaction->num_blocks-1) {
289                                 tdb->transaction->last_block_size = len2;
290                         }                       
291                 }
292         }
293         
294         /* overwrite part of an existing block */
295         if (buf == NULL) {
296                 memset(tdb->transaction->blocks[blk] + off, 0, len);
297         } else {
298                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
299         }
300         if (blk == tdb->transaction->num_blocks-1) {
301                 if (len + off > tdb->transaction->last_block_size) {
302                         tdb->transaction->last_block_size = len + off;
303                 }
304         }
305
306         return 0;
307
308 fail:
309         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
310                  (blk*tdb->transaction->block_size) + off, len));
311         tdb->transaction->transaction_error = 1;
312         return -1;
313 }
314
315
316 /*
317   write while in a transaction - this varient never expands the transaction blocks, it only
318   updates existing blocks. This means it cannot change the recovery size
319 */
320 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
321                                       const void *buf, tdb_len_t len)
322 {
323         uint32_t blk;
324
325         /* break it up into block sized chunks */
326         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
327                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
328                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
329                         return -1;
330                 }
331                 len -= len2;
332                 off += len2;
333                 if (buf != NULL) {
334                         buf = (const void *)(len2 + (const char *)buf);
335                 }
336         }
337
338         if (len == 0) {
339                 return 0;
340         }
341
342         blk = off / tdb->transaction->block_size;
343         off = off % tdb->transaction->block_size;
344
345         if (tdb->transaction->num_blocks <= blk ||
346             tdb->transaction->blocks[blk] == NULL) {
347                 return 0;
348         }
349
350         if (blk == tdb->transaction->num_blocks-1 &&
351             off + len > tdb->transaction->last_block_size) {
352                 if (off >= tdb->transaction->last_block_size) {
353                         return 0;
354                 }
355                 len = tdb->transaction->last_block_size - off;
356         }
357
358         /* overwrite part of an existing block */
359         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
360
361         return 0;
362 }
363
364
365 /*
366   accelerated hash chain head search, using the cached hash heads
367 */
368 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
369 {
370         uint32_t h = *chain;
371         for (;h < tdb->header.hash_size;h++) {
372                 /* the +1 takes account of the freelist */
373                 if (0 != tdb->transaction->hash_heads[h+1]) {
374                         break;
375                 }
376         }
377         (*chain) = h;
378 }
379
380 /*
381   out of bounds check during a transaction
382 */
383 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
384 {
385         if (len <= tdb->map_size) {
386                 return 0;
387         }
388         tdb->ecode = TDB_ERR_IO;
389         return -1;
390 }
391
392 /*
393   transaction version of tdb_expand().
394 */
395 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
396                                    tdb_off_t addition)
397 {
398         /* add a write to the transaction elements, so subsequent
399            reads see the zero data */
400         if (transaction_write(tdb, size, NULL, addition) != 0) {
401                 return -1;
402         }
403
404         tdb->transaction->need_repack = true;
405
406         return 0;
407 }
408
409 /*
410   brlock during a transaction - ignore them
411 */
412 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
413                               int rw_type, int lck_type, int probe, size_t len)
414 {
415         return 0;
416 }
417
418 static const struct tdb_methods transaction_methods = {
419         transaction_read,
420         transaction_write,
421         transaction_next_hash_chain,
422         transaction_oob,
423         transaction_expand_file,
424         transaction_brlock
425 };
426
427 /*
428   sync to disk
429 */
430 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
431 {       
432         if (tdb->flags & TDB_NOSYNC) {
433                 return 0;
434         }
435
436         if (fsync(tdb->fd) != 0) {
437                 tdb->ecode = TDB_ERR_IO;
438                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
439                 return -1;
440         }
441 #ifdef MS_SYNC
442         if (tdb->map_ptr) {
443                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
444                 if (msync(moffset + (char *)tdb->map_ptr, 
445                           length + (offset - moffset), MS_SYNC) != 0) {
446                         tdb->ecode = TDB_ERR_IO;
447                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
448                                  strerror(errno)));
449                         return -1;
450                 }
451         }
452 #endif
453         return 0;
454 }
455
456 int tdb_transaction_cancel_internal(struct tdb_context *tdb)
457 {
458         int i, ret = 0;
459
460         if (tdb->transaction == NULL) {
461                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
462                 return -1;
463         }
464
465         if (tdb->transaction->nesting != 0) {
466                 tdb->transaction->transaction_error = 1;
467                 tdb->transaction->nesting--;
468                 return 0;
469         }               
470
471         tdb->map_size = tdb->transaction->old_map_size;
472
473         /* free all the transaction blocks */
474         for (i=0;i<tdb->transaction->num_blocks;i++) {
475                 if (tdb->transaction->blocks[i] != NULL) {
476                         free(tdb->transaction->blocks[i]);
477                 }
478         }
479         SAFE_FREE(tdb->transaction->blocks);
480
481         if (tdb->transaction->magic_offset) {
482                 const struct tdb_methods *methods = tdb->transaction->io_methods;
483                 uint32_t zero = 0;
484
485                 /* remove the recovery marker */
486                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
487                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
488                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
489                         ret = -1;
490                 }
491         }
492
493         /* remove any global lock created during the transaction */
494         if (tdb->global_lock.count != 0) {
495                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
496                 tdb->global_lock.count = 0;
497         }
498
499         /* remove any locks created during the transaction */
500         if (tdb->num_locks != 0) {
501                 for (i=0;i<tdb->num_lockrecs;i++) {
502                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
503                                    F_UNLCK,F_SETLKW, 0, 1);
504                 }
505                 tdb->num_locks = 0;
506                 tdb->num_lockrecs = 0;
507                 SAFE_FREE(tdb->lockrecs);
508         }
509
510         /* restore the normal io methods */
511         tdb->methods = tdb->transaction->io_methods;
512
513         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
514         tdb_transaction_unlock(tdb);
515         SAFE_FREE(tdb->transaction->hash_heads);
516         SAFE_FREE(tdb->transaction);
517         
518         return ret;
519 }
520
521 /*
522   start a tdb transaction. No token is returned, as only a single
523   transaction is allowed to be pending per tdb_context
524 */
525 int tdb_transaction_start(struct tdb_context *tdb)
526 {
527         /* some sanity checks */
528         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
529                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
530                 tdb->ecode = TDB_ERR_EINVAL;
531                 return -1;
532         }
533
534         /* cope with nested tdb_transaction_start() calls */
535         if (tdb->transaction != NULL) {
536                 tdb_trace(tdb, "tdb_transaction_start");
537                 if (!tdb->flags & TDB_NO_NESTING) {
538                         tdb->transaction->nesting++;
539                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
540                                  tdb->transaction->nesting));
541                         return 0;
542                 } else {
543                         tdb_transaction_cancel_internal(tdb);
544                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
545                 }
546         }
547
548         if (tdb->num_locks != 0 || tdb->global_lock.count) {
549                 /* the caller must not have any locks when starting a
550                    transaction as otherwise we'll be screwed by lack
551                    of nested locks in posix */
552                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
553                 tdb->ecode = TDB_ERR_LOCK;
554                 return -1;
555         }
556
557         if (tdb->travlocks.next != NULL) {
558                 /* you cannot use transactions inside a traverse (although you can use
559                    traverse inside a transaction) as otherwise you can end up with
560                    deadlock */
561                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
562                 tdb->ecode = TDB_ERR_LOCK;
563                 return -1;
564         }
565
566         tdb->transaction = (struct tdb_transaction *)
567                 calloc(sizeof(struct tdb_transaction), 1);
568         if (tdb->transaction == NULL) {
569                 tdb->ecode = TDB_ERR_OOM;
570                 return -1;
571         }
572
573         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
574         tdb->transaction->block_size = tdb->page_size;
575
576         /* get the transaction write lock. This is a blocking lock. As
577            discussed with Volker, there are a number of ways we could
578            make this async, which we will probably do in the future */
579         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
580                 SAFE_FREE(tdb->transaction->blocks);
581                 SAFE_FREE(tdb->transaction);
582                 return -1;
583         }
584         
585         /* get a read lock from the freelist to the end of file. This
586            is upgraded to a write lock during the commit */
587         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
588                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
589                 tdb->ecode = TDB_ERR_LOCK;
590                 goto fail;
591         }
592
593         /* setup a copy of the hash table heads so the hash scan in
594            traverse can be fast */
595         tdb->transaction->hash_heads = (uint32_t *)
596                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
597         if (tdb->transaction->hash_heads == NULL) {
598                 tdb->ecode = TDB_ERR_OOM;
599                 goto fail;
600         }
601         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
602                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
603                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
604                 tdb->ecode = TDB_ERR_IO;
605                 goto fail;
606         }
607
608         /* make sure we know about any file expansions already done by
609            anyone else */
610         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
611         tdb->transaction->old_map_size = tdb->map_size;
612
613         /* finally hook the io methods, replacing them with
614            transaction specific methods */
615         tdb->transaction->io_methods = tdb->methods;
616         tdb->methods = &transaction_methods;
617
618         /* Trace at the end, so we get sequence number correct. */
619         tdb_trace(tdb, "tdb_transaction_start");
620         return 0;
621         
622 fail:
623         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
624         tdb_transaction_unlock(tdb);
625         SAFE_FREE(tdb->transaction->blocks);
626         SAFE_FREE(tdb->transaction->hash_heads);
627         SAFE_FREE(tdb->transaction);
628         return -1;
629 }
630
631
632 /*
633   cancel the current transaction
634 */
635 int tdb_transaction_cancel(struct tdb_context *tdb)
636 {       
637         tdb_trace(tdb, "tdb_transaction_cancel");
638         return tdb_transaction_cancel_internal(tdb);
639 }
640
641 /*
642   work out how much space the linearised recovery data will consume
643 */
644 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
645 {
646         tdb_len_t recovery_size = 0;
647         int i;
648
649         recovery_size = sizeof(uint32_t);
650         for (i=0;i<tdb->transaction->num_blocks;i++) {
651                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
652                         break;
653                 }
654                 if (tdb->transaction->blocks[i] == NULL) {
655                         continue;
656                 }
657                 recovery_size += 2*sizeof(tdb_off_t);
658                 if (i == tdb->transaction->num_blocks-1) {
659                         recovery_size += tdb->transaction->last_block_size;
660                 } else {
661                         recovery_size += tdb->transaction->block_size;
662                 }
663         }       
664
665         return recovery_size;
666 }
667
668 /*
669   allocate the recovery area, or use an existing recovery area if it is
670   large enough
671 */
672 static int tdb_recovery_allocate(struct tdb_context *tdb, 
673                                  tdb_len_t *recovery_size,
674                                  tdb_off_t *recovery_offset,
675                                  tdb_len_t *recovery_max_size)
676 {
677         struct list_struct rec;
678         const struct tdb_methods *methods = tdb->transaction->io_methods;
679         tdb_off_t recovery_head;
680
681         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
682                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
683                 return -1;
684         }
685
686         rec.rec_len = 0;
687
688         if (recovery_head != 0 && 
689             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
690                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
691                 return -1;
692         }
693
694         *recovery_size = tdb_recovery_size(tdb);
695
696         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
697                 /* it fits in the existing area */
698                 *recovery_max_size = rec.rec_len;
699                 *recovery_offset = recovery_head;
700                 return 0;
701         }
702
703         /* we need to free up the old recovery area, then allocate a
704            new one at the end of the file. Note that we cannot use
705            tdb_allocate() to allocate the new one as that might return
706            us an area that is being currently used (as of the start of
707            the transaction) */
708         if (recovery_head != 0) {
709                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
710                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
711                         return -1;
712                 }
713         }
714
715         /* the tdb_free() call might have increased the recovery size */
716         *recovery_size = tdb_recovery_size(tdb);
717
718         /* round up to a multiple of page size */
719         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
720         *recovery_offset = tdb->map_size;
721         recovery_head = *recovery_offset;
722
723         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
724                                      (tdb->map_size - tdb->transaction->old_map_size) +
725                                      sizeof(rec) + *recovery_max_size) == -1) {
726                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
727                 return -1;
728         }
729
730         /* remap the file (if using mmap) */
731         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
732
733         /* we have to reset the old map size so that we don't try to expand the file
734            again in the transaction commit, which would destroy the recovery area */
735         tdb->transaction->old_map_size = tdb->map_size;
736
737         /* write the recovery header offset and sync - we can sync without a race here
738            as the magic ptr in the recovery record has not been set */
739         CONVERT(recovery_head);
740         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
741                                &recovery_head, sizeof(tdb_off_t)) == -1) {
742                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
743                 return -1;
744         }
745         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
746                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
747                 return -1;
748         }
749
750         return 0;
751 }
752
753
754 /*
755   setup the recovery data that will be used on a crash during commit
756 */
757 static int transaction_setup_recovery(struct tdb_context *tdb, 
758                                       tdb_off_t *magic_offset)
759 {
760         tdb_len_t recovery_size;
761         unsigned char *data, *p;
762         const struct tdb_methods *methods = tdb->transaction->io_methods;
763         struct list_struct *rec;
764         tdb_off_t recovery_offset, recovery_max_size;
765         tdb_off_t old_map_size = tdb->transaction->old_map_size;
766         uint32_t magic, tailer;
767         int i;
768
769         /*
770           check that the recovery area has enough space
771         */
772         if (tdb_recovery_allocate(tdb, &recovery_size, 
773                                   &recovery_offset, &recovery_max_size) == -1) {
774                 return -1;
775         }
776
777         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
778         if (data == NULL) {
779                 tdb->ecode = TDB_ERR_OOM;
780                 return -1;
781         }
782
783         rec = (struct list_struct *)data;
784         memset(rec, 0, sizeof(*rec));
785
786         rec->magic    = 0;
787         rec->data_len = recovery_size;
788         rec->rec_len  = recovery_max_size;
789         rec->key_len  = old_map_size;
790         CONVERT(rec);
791
792         /* build the recovery data into a single blob to allow us to do a single
793            large write, which should be more efficient */
794         p = data + sizeof(*rec);
795         for (i=0;i<tdb->transaction->num_blocks;i++) {
796                 tdb_off_t offset;
797                 tdb_len_t length;
798
799                 if (tdb->transaction->blocks[i] == NULL) {
800                         continue;
801                 }
802
803                 offset = i * tdb->transaction->block_size;
804                 length = tdb->transaction->block_size;
805                 if (i == tdb->transaction->num_blocks-1) {
806                         length = tdb->transaction->last_block_size;
807                 }
808                 
809                 if (offset >= old_map_size) {
810                         continue;
811                 }
812                 if (offset + length > tdb->transaction->old_map_size) {
813                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
814                         free(data);
815                         tdb->ecode = TDB_ERR_CORRUPT;
816                         return -1;
817                 }
818                 memcpy(p, &offset, 4);
819                 memcpy(p+4, &length, 4);
820                 if (DOCONV()) {
821                         tdb_convert(p, 8);
822                 }
823                 /* the recovery area contains the old data, not the
824                    new data, so we have to call the original tdb_read
825                    method to get it */
826                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
827                         free(data);
828                         tdb->ecode = TDB_ERR_IO;
829                         return -1;
830                 }
831                 p += 8 + length;
832         }
833
834         /* and the tailer */
835         tailer = sizeof(*rec) + recovery_max_size;
836         memcpy(p, &tailer, 4);
837         CONVERT(p);
838
839         /* write the recovery data to the recovery area */
840         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
841                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
842                 free(data);
843                 tdb->ecode = TDB_ERR_IO;
844                 return -1;
845         }
846         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
847                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
848                 free(data);
849                 tdb->ecode = TDB_ERR_IO;
850                 return -1;
851         }
852
853         /* as we don't have ordered writes, we have to sync the recovery
854            data before we update the magic to indicate that the recovery
855            data is present */
856         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
857                 free(data);
858                 return -1;
859         }
860
861         free(data);
862
863         magic = TDB_RECOVERY_MAGIC;
864         CONVERT(magic);
865
866         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
867
868         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
869                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
870                 tdb->ecode = TDB_ERR_IO;
871                 return -1;
872         }
873         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
874                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
875                 tdb->ecode = TDB_ERR_IO;
876                 return -1;
877         }
878
879         /* ensure the recovery magic marker is on disk */
880         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
881                 return -1;
882         }
883
884         return 0;
885 }
886
887 static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
888 {       
889         const struct tdb_methods *methods;
890
891         if (tdb->transaction == NULL) {
892                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
893                 return -1;
894         }
895
896         if (tdb->transaction->prepared) {
897                 tdb->ecode = TDB_ERR_EINVAL;
898                 tdb_transaction_cancel(tdb);
899                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
900                 return -1;
901         }
902
903         if (tdb->transaction->transaction_error) {
904                 tdb->ecode = TDB_ERR_IO;
905                 tdb_transaction_cancel_internal(tdb);
906                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
907                 return -1;
908         }
909
910
911         if (tdb->transaction->nesting != 0) {
912                 tdb->transaction->nesting--;
913                 return 0;
914         }               
915
916 #ifdef TDB_TRACE
917         /* store seqnum now, before reading becomes illegal. */
918         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &tdb->transaction_prepare_seqnum);
919 #endif
920
921         /* check for a null transaction */
922         if (tdb->transaction->blocks == NULL) {
923                 return 0;
924         }
925
926         methods = tdb->transaction->io_methods;
927         
928         /* if there are any locks pending then the caller has not
929            nested their locks properly, so fail the transaction */
930         if (tdb->num_locks || tdb->global_lock.count) {
931                 tdb->ecode = TDB_ERR_LOCK;
932                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
933                 tdb_transaction_cancel_internal(tdb);
934                 return -1;
935         }
936
937         /* upgrade the main transaction lock region to a write lock */
938         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
939                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
940                 tdb->ecode = TDB_ERR_LOCK;
941                 tdb_transaction_cancel_internal(tdb);
942                 return -1;
943         }
944
945         /* get the global lock - this prevents new users attaching to the database
946            during the commit */
947         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
948                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
949                 tdb->ecode = TDB_ERR_LOCK;
950                 tdb_transaction_cancel_internal(tdb);
951                 return -1;
952         }
953
954         if (!(tdb->flags & TDB_NOSYNC)) {
955                 /* write the recovery data to the end of the file */
956                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
957                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
958                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
959                         tdb_transaction_cancel_internal(tdb);
960                         return -1;
961                 }
962         }
963
964         tdb->transaction->prepared = true;
965
966         /* expand the file to the new size if needed */
967         if (tdb->map_size != tdb->transaction->old_map_size) {
968                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
969                                              tdb->map_size - 
970                                              tdb->transaction->old_map_size) == -1) {
971                         tdb->ecode = TDB_ERR_IO;
972                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
973                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
974                         tdb_transaction_cancel_internal(tdb);
975                         return -1;
976                 }
977                 tdb->map_size = tdb->transaction->old_map_size;
978                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
979         }
980
981         /* Keep the global lock until the actual commit */
982
983         return 0;
984 }
985
986 /*
987    prepare to commit the current transaction
988 */
989 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
990 {       
991         tdb_trace(tdb, "tdb_transaction_prepare_commit");
992         return tdb_transaction_prepare_commit_internal(tdb);
993 }
994
995 /*
996   commit the current transaction
997 */
998 int tdb_transaction_commit(struct tdb_context *tdb)
999 {       
1000         const struct tdb_methods *methods;
1001         int i;
1002         bool need_repack;
1003
1004         if (tdb->transaction == NULL) {
1005                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1006                 return -1;
1007         }
1008
1009         /* If we've prepared, can't read seqnum. */
1010         if (tdb->transaction->prepared) {
1011                 tdb_trace_seqnum(tdb, tdb->transaction_prepare_seqnum,
1012                                  "tdb_transaction_commit");
1013         } else {
1014                 tdb_trace(tdb, "tdb_transaction_commit");
1015         }
1016
1017         if (tdb->transaction->transaction_error) {
1018                 tdb->ecode = TDB_ERR_IO;
1019                 tdb_transaction_cancel(tdb);
1020                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1021                 return -1;
1022         }
1023
1024
1025         if (tdb->transaction->nesting != 0) {
1026                 tdb->transaction->nesting--;
1027                 return 0;
1028         }
1029
1030         /* check for a null transaction */
1031         if (tdb->transaction->blocks == NULL) {
1032                 tdb_transaction_cancel_internal(tdb);
1033                 return 0;
1034         }
1035
1036         if (!tdb->transaction->prepared) {
1037                 int ret = tdb_transaction_prepare_commit_internal(tdb);
1038                 if (ret)
1039                         return ret;
1040         }
1041
1042         methods = tdb->transaction->io_methods;
1043
1044         /* perform all the writes */
1045         for (i=0;i<tdb->transaction->num_blocks;i++) {
1046                 tdb_off_t offset;
1047                 tdb_len_t length;
1048
1049                 if (tdb->transaction->blocks[i] == NULL) {
1050                         continue;
1051                 }
1052
1053                 offset = i * tdb->transaction->block_size;
1054                 length = tdb->transaction->block_size;
1055                 if (i == tdb->transaction->num_blocks-1) {
1056                         length = tdb->transaction->last_block_size;
1057                 }
1058
1059                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1060                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1061                         
1062                         /* we've overwritten part of the data and
1063                            possibly expanded the file, so we need to
1064                            run the crash recovery code */
1065                         tdb->methods = methods;
1066                         tdb_transaction_recover(tdb); 
1067
1068                         tdb_transaction_cancel_internal(tdb);
1069                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1070
1071                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1072                         return -1;
1073                 }
1074                 SAFE_FREE(tdb->transaction->blocks[i]);
1075         } 
1076
1077         SAFE_FREE(tdb->transaction->blocks);
1078         tdb->transaction->num_blocks = 0;
1079
1080         /* ensure the new data is on disk */
1081         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1082                 return -1;
1083         }
1084
1085         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1086
1087         /*
1088           TODO: maybe write to some dummy hdr field, or write to magic
1089           offset without mmap, before the last sync, instead of the
1090           utime() call
1091         */
1092
1093         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1094            don't change the mtime of the file, this means the file may
1095            not be backed up (as tdb rounding to block sizes means that
1096            file size changes are quite rare too). The following forces
1097            mtime changes when a transaction completes */
1098 #if HAVE_UTIME
1099         utime(tdb->name, NULL);
1100 #endif
1101
1102         need_repack = tdb->transaction->need_repack;
1103
1104         /* use a transaction cancel to free memory and remove the
1105            transaction locks */
1106         tdb_transaction_cancel_internal(tdb);
1107
1108         if (need_repack) {
1109                 return tdb_repack(tdb);
1110         }
1111
1112         return 0;
1113 }
1114
1115
1116 /*
1117   recover from an aborted transaction. Must be called with exclusive
1118   database write access already established (including the global
1119   lock to prevent new processes attaching)
1120 */
1121 int tdb_transaction_recover(struct tdb_context *tdb)
1122 {
1123         tdb_off_t recovery_head, recovery_eof;
1124         unsigned char *data, *p;
1125         uint32_t zero = 0;
1126         struct list_struct rec;
1127
1128         /* find the recovery area */
1129         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1130                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1131                 tdb->ecode = TDB_ERR_IO;
1132                 return -1;
1133         }
1134
1135         if (recovery_head == 0) {
1136                 /* we have never allocated a recovery record */
1137                 return 0;
1138         }
1139
1140         /* read the recovery record */
1141         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1142                                    sizeof(rec), DOCONV()) == -1) {
1143                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1144                 tdb->ecode = TDB_ERR_IO;
1145                 return -1;
1146         }
1147
1148         if (rec.magic != TDB_RECOVERY_MAGIC) {
1149                 /* there is no valid recovery data */
1150                 return 0;
1151         }
1152
1153         if (tdb->read_only) {
1154                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1155                 tdb->ecode = TDB_ERR_CORRUPT;
1156                 return -1;
1157         }
1158
1159         recovery_eof = rec.key_len;
1160
1161         data = (unsigned char *)malloc(rec.data_len);
1162         if (data == NULL) {
1163                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1164                 tdb->ecode = TDB_ERR_OOM;
1165                 return -1;
1166         }
1167
1168         /* read the full recovery data */
1169         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1170                                    rec.data_len, 0) == -1) {
1171                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1172                 tdb->ecode = TDB_ERR_IO;
1173                 return -1;
1174         }
1175
1176         /* recover the file data */
1177         p = data;
1178         while (p+8 < data + rec.data_len) {
1179                 uint32_t ofs, len;
1180                 if (DOCONV()) {
1181                         tdb_convert(p, 8);
1182                 }
1183                 memcpy(&ofs, p, 4);
1184                 memcpy(&len, p+4, 4);
1185
1186                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1187                         free(data);
1188                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1189                         tdb->ecode = TDB_ERR_IO;
1190                         return -1;
1191                 }
1192                 p += 8 + len;
1193         }
1194
1195         free(data);
1196
1197         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1198                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1199                 tdb->ecode = TDB_ERR_IO;
1200                 return -1;
1201         }
1202
1203         /* if the recovery area is after the recovered eof then remove it */
1204         if (recovery_eof <= recovery_head) {
1205                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1206                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1207                         tdb->ecode = TDB_ERR_IO;
1208                         return -1;                      
1209                 }
1210         }
1211
1212         /* remove the recovery magic */
1213         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1214                           &zero) == -1) {
1215                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1216                 tdb->ecode = TDB_ERR_IO;
1217                 return -1;                      
1218         }
1219         
1220         /* reduce the file size to the old size */
1221         tdb_munmap(tdb);
1222         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1223                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1224                 tdb->ecode = TDB_ERR_IO;
1225                 return -1;                      
1226         }
1227         tdb->map_size = recovery_eof;
1228         tdb_mmap(tdb);
1229
1230         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1231                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1232                 tdb->ecode = TDB_ERR_IO;
1233                 return -1;
1234         }
1235
1236         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1237                  recovery_eof));
1238
1239         /* all done */
1240         return 0;
1241 }