]> git.ozlabs.org Git - ccan/blob - ccan/tdb/transaction.c
0944bb36e98a6a3af4e32ac2052e512483bc4311
[ccan] / ccan / tdb / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_NO_NESTING is passed to flags in tdb open then transaction
89     nesting is disabled. tdb_transaction_start() will then implicitely
90     cancel any pending transactions and always start a new transaction
91     context instead of nesting.
92
93 */
94
95
96 /*
97   hold the context of any current transaction
98 */
99 struct tdb_transaction {
100         /* we keep a mirrored copy of the tdb hash heads here so
101            tdb_next_hash_chain() can operate efficiently */
102         uint32_t *hash_heads;
103
104         /* the original io methods - used to do IOs to the real db */
105         const struct tdb_methods *io_methods;
106
107         /* the list of transaction blocks. When a block is first
108            written to, it gets created in this list */
109         uint8_t **blocks;
110         uint32_t num_blocks;
111         uint32_t block_size;      /* bytes in each block */
112         uint32_t last_block_size; /* number of valid bytes in the last block */
113
114         /* non-zero when an internal transaction error has
115            occurred. All write operations will then fail until the
116            transaction is ended */
117         int transaction_error;
118
119         /* when inside a transaction we need to keep track of any
120            nested tdb_transaction_start() calls, as these are allowed,
121            but don't create a new transaction */
122         int nesting;
123
124         /* set when a prepare has already occurred */
125         bool prepared;
126         tdb_off_t magic_offset;
127
128         /* old file size before transaction */
129         tdb_len_t old_map_size;
130
131         /* we should re-pack on commit */
132         bool need_repack;
133 };
134
135
136 /*
137   read while in a transaction. We need to check first if the data is in our list
138   of transaction elements, then if not do a real read
139 */
140 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
141                             tdb_len_t len, int cv)
142 {
143         uint32_t blk;
144
145         /* Only a commit is allowed on a prepared transaction */
146         if (tdb->transaction->prepared) {
147                 tdb->ecode = TDB_ERR_EINVAL;
148                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: transaction already prepared, read not allowed\n"));
149                 tdb->transaction->transaction_error = 1;
150                 return -1;
151         }
152
153         /* break it down into block sized ops */
154         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
155                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
156                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
157                         return -1;
158                 }
159                 len -= len2;
160                 off += len2;
161                 buf = (void *)(len2 + (char *)buf);
162         }
163
164         if (len == 0) {
165                 return 0;
166         }
167
168         blk = off / tdb->transaction->block_size;
169
170         /* see if we have it in the block list */
171         if (tdb->transaction->num_blocks <= blk ||
172             tdb->transaction->blocks[blk] == NULL) {
173                 /* nope, do a real read */
174                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
175                         goto fail;
176                 }
177                 return 0;
178         }
179
180         /* it is in the block list. Now check for the last block */
181         if (blk == tdb->transaction->num_blocks-1) {
182                 if (len > tdb->transaction->last_block_size) {
183                         goto fail;
184                 }
185         }
186         
187         /* now copy it out of this block */
188         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
189         if (cv) {
190                 tdb_convert(buf, len);
191         }
192         return 0;
193
194 fail:
195         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
196         tdb->ecode = TDB_ERR_IO;
197         tdb->transaction->transaction_error = 1;
198         return -1;
199 }
200
201
202 /*
203   write while in a transaction
204 */
205 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
206                              const void *buf, tdb_len_t len)
207 {
208         uint32_t blk;
209
210         /* Only a commit is allowed on a prepared transaction */
211         if (tdb->transaction->prepared) {
212                 tdb->ecode = TDB_ERR_EINVAL;
213                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
214                 tdb->transaction->transaction_error = 1;
215                 return -1;
216         }
217
218         /* if the write is to a hash head, then update the transaction
219            hash heads */
220         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
221             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
222                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
223                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
224         }
225
226         /* break it up into block sized chunks */
227         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
228                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
229                 if (transaction_write(tdb, off, buf, len2) != 0) {
230                         return -1;
231                 }
232                 len -= len2;
233                 off += len2;
234                 if (buf != NULL) {
235                         buf = (const void *)(len2 + (const char *)buf);
236                 }
237         }
238
239         if (len == 0) {
240                 return 0;
241         }
242
243         blk = off / tdb->transaction->block_size;
244         off = off % tdb->transaction->block_size;
245
246         if (tdb->transaction->num_blocks <= blk) {
247                 uint8_t **new_blocks;
248                 /* expand the blocks array */
249                 if (tdb->transaction->blocks == NULL) {
250                         new_blocks = (uint8_t **)malloc(
251                                 (blk+1)*sizeof(uint8_t *));
252                 } else {
253                         new_blocks = (uint8_t **)realloc(
254                                 tdb->transaction->blocks,
255                                 (blk+1)*sizeof(uint8_t *));
256                 }
257                 if (new_blocks == NULL) {
258                         tdb->ecode = TDB_ERR_OOM;
259                         goto fail;
260                 }
261                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
262                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
263                 tdb->transaction->blocks = new_blocks;
264                 tdb->transaction->num_blocks = blk+1;
265                 tdb->transaction->last_block_size = 0;
266         }
267
268         /* allocate and fill a block? */
269         if (tdb->transaction->blocks[blk] == NULL) {
270                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
271                 if (tdb->transaction->blocks[blk] == NULL) {
272                         tdb->ecode = TDB_ERR_OOM;
273                         tdb->transaction->transaction_error = 1;
274                         return -1;                      
275                 }
276                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
277                         tdb_len_t len2 = tdb->transaction->block_size;
278                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
279                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
280                         }
281                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
282                                                                    tdb->transaction->blocks[blk], 
283                                                                    len2, 0) != 0) {
284                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
285                                 tdb->ecode = TDB_ERR_IO;
286                                 goto fail;
287                         }
288                         if (blk == tdb->transaction->num_blocks-1) {
289                                 tdb->transaction->last_block_size = len2;
290                         }                       
291                 }
292         }
293         
294         /* overwrite part of an existing block */
295         if (buf == NULL) {
296                 memset(tdb->transaction->blocks[blk] + off, 0, len);
297         } else {
298                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
299         }
300         if (blk == tdb->transaction->num_blocks-1) {
301                 if (len + off > tdb->transaction->last_block_size) {
302                         tdb->transaction->last_block_size = len + off;
303                 }
304         }
305
306         return 0;
307
308 fail:
309         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
310                  (blk*tdb->transaction->block_size) + off, len));
311         tdb->transaction->transaction_error = 1;
312         return -1;
313 }
314
315
316 /*
317   write while in a transaction - this varient never expands the transaction blocks, it only
318   updates existing blocks. This means it cannot change the recovery size
319 */
320 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
321                                       const void *buf, tdb_len_t len)
322 {
323         uint32_t blk;
324
325         /* break it up into block sized chunks */
326         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
327                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
328                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
329                         return -1;
330                 }
331                 len -= len2;
332                 off += len2;
333                 if (buf != NULL) {
334                         buf = (const void *)(len2 + (const char *)buf);
335                 }
336         }
337
338         if (len == 0) {
339                 return 0;
340         }
341
342         blk = off / tdb->transaction->block_size;
343         off = off % tdb->transaction->block_size;
344
345         if (tdb->transaction->num_blocks <= blk ||
346             tdb->transaction->blocks[blk] == NULL) {
347                 return 0;
348         }
349
350         if (blk == tdb->transaction->num_blocks-1 &&
351             off + len > tdb->transaction->last_block_size) {
352                 if (off >= tdb->transaction->last_block_size) {
353                         return 0;
354                 }
355                 len = tdb->transaction->last_block_size - off;
356         }
357
358         /* overwrite part of an existing block */
359         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
360
361         return 0;
362 }
363
364
365 /*
366   accelerated hash chain head search, using the cached hash heads
367 */
368 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
369 {
370         uint32_t h = *chain;
371         for (;h < tdb->header.hash_size;h++) {
372                 /* the +1 takes account of the freelist */
373                 if (0 != tdb->transaction->hash_heads[h+1]) {
374                         break;
375                 }
376         }
377         (*chain) = h;
378 }
379
380 /*
381   out of bounds check during a transaction
382 */
383 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
384 {
385         if (len <= tdb->map_size) {
386                 return 0;
387         }
388         return TDB_ERRCODE(TDB_ERR_IO, -1);
389 }
390
391 /*
392   transaction version of tdb_expand().
393 */
394 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
395                                    tdb_off_t addition)
396 {
397         /* add a write to the transaction elements, so subsequent
398            reads see the zero data */
399         if (transaction_write(tdb, size, NULL, addition) != 0) {
400                 return -1;
401         }
402
403         tdb->transaction->need_repack = true;
404
405         return 0;
406 }
407
408 /*
409   brlock during a transaction - ignore them
410 */
411 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
412                               int rw_type, int lck_type, int probe, size_t len)
413 {
414         return 0;
415 }
416
417 static const struct tdb_methods transaction_methods = {
418         transaction_read,
419         transaction_write,
420         transaction_next_hash_chain,
421         transaction_oob,
422         transaction_expand_file,
423         transaction_brlock
424 };
425
426 /*
427   sync to disk
428 */
429 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
430 {       
431         if (tdb->flags & TDB_NOSYNC) {
432                 return 0;
433         }
434
435         if (fsync(tdb->fd) != 0) {
436                 tdb->ecode = TDB_ERR_IO;
437                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
438                 return -1;
439         }
440 #ifdef MS_SYNC
441         if (tdb->map_ptr) {
442                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
443                 if (msync(moffset + (char *)tdb->map_ptr, 
444                           length + (offset - moffset), MS_SYNC) != 0) {
445                         tdb->ecode = TDB_ERR_IO;
446                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
447                                  strerror(errno)));
448                         return -1;
449                 }
450         }
451 #endif
452         return 0;
453 }
454
455 int tdb_transaction_cancel_internal(struct tdb_context *tdb)
456 {
457         int i, ret = 0;
458
459         if (tdb->transaction == NULL) {
460                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
461                 return -1;
462         }
463
464         if (tdb->transaction->nesting != 0) {
465                 tdb->transaction->transaction_error = 1;
466                 tdb->transaction->nesting--;
467                 return 0;
468         }               
469
470         tdb->map_size = tdb->transaction->old_map_size;
471
472         /* free all the transaction blocks */
473         for (i=0;i<tdb->transaction->num_blocks;i++) {
474                 if (tdb->transaction->blocks[i] != NULL) {
475                         free(tdb->transaction->blocks[i]);
476                 }
477         }
478         SAFE_FREE(tdb->transaction->blocks);
479
480         if (tdb->transaction->magic_offset) {
481                 const struct tdb_methods *methods = tdb->transaction->io_methods;
482                 uint32_t zero = 0;
483
484                 /* remove the recovery marker */
485                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
486                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
487                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
488                         ret = -1;
489                 }
490         }
491
492         /* remove any global lock created during the transaction */
493         if (tdb->global_lock.count != 0) {
494                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
495                 tdb->global_lock.count = 0;
496         }
497
498         /* remove any locks created during the transaction */
499         if (tdb->num_locks != 0) {
500                 for (i=0;i<tdb->num_lockrecs;i++) {
501                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
502                                    F_UNLCK,F_SETLKW, 0, 1);
503                 }
504                 tdb->num_locks = 0;
505                 tdb->num_lockrecs = 0;
506                 SAFE_FREE(tdb->lockrecs);
507         }
508
509         /* restore the normal io methods */
510         tdb->methods = tdb->transaction->io_methods;
511
512         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
513         tdb_transaction_unlock(tdb);
514         SAFE_FREE(tdb->transaction->hash_heads);
515         SAFE_FREE(tdb->transaction);
516         
517         return ret;
518 }
519
520 /*
521   start a tdb transaction. No token is returned, as only a single
522   transaction is allowed to be pending per tdb_context
523 */
524 int tdb_transaction_start(struct tdb_context *tdb)
525 {
526         /* some sanity checks */
527         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
528                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
529                 tdb->ecode = TDB_ERR_EINVAL;
530                 return -1;
531         }
532
533         /* cope with nested tdb_transaction_start() calls */
534         if (tdb->transaction != NULL) {
535                 tdb_trace(tdb, "tdb_transaction_start");
536                 if (!tdb->flags & TDB_NO_NESTING) {
537                         tdb->transaction->nesting++;
538                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
539                                  tdb->transaction->nesting));
540                         return 0;
541                 } else {
542                         tdb_transaction_cancel_internal(tdb);
543                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
544                 }
545         }
546
547         if (tdb->num_locks != 0 || tdb->global_lock.count) {
548                 /* the caller must not have any locks when starting a
549                    transaction as otherwise we'll be screwed by lack
550                    of nested locks in posix */
551                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
552                 tdb->ecode = TDB_ERR_LOCK;
553                 return -1;
554         }
555
556         if (tdb->travlocks.next != NULL) {
557                 /* you cannot use transactions inside a traverse (although you can use
558                    traverse inside a transaction) as otherwise you can end up with
559                    deadlock */
560                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
561                 tdb->ecode = TDB_ERR_LOCK;
562                 return -1;
563         }
564
565         tdb->transaction = (struct tdb_transaction *)
566                 calloc(sizeof(struct tdb_transaction), 1);
567         if (tdb->transaction == NULL) {
568                 tdb->ecode = TDB_ERR_OOM;
569                 return -1;
570         }
571
572         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
573         tdb->transaction->block_size = tdb->page_size;
574
575         /* get the transaction write lock. This is a blocking lock. As
576            discussed with Volker, there are a number of ways we could
577            make this async, which we will probably do in the future */
578         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
579                 SAFE_FREE(tdb->transaction->blocks);
580                 SAFE_FREE(tdb->transaction);
581                 return -1;
582         }
583         
584         /* get a read lock from the freelist to the end of file. This
585            is upgraded to a write lock during the commit */
586         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
587                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
588                 tdb->ecode = TDB_ERR_LOCK;
589                 goto fail;
590         }
591
592         /* setup a copy of the hash table heads so the hash scan in
593            traverse can be fast */
594         tdb->transaction->hash_heads = (uint32_t *)
595                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
596         if (tdb->transaction->hash_heads == NULL) {
597                 tdb->ecode = TDB_ERR_OOM;
598                 goto fail;
599         }
600         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
601                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
602                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
603                 tdb->ecode = TDB_ERR_IO;
604                 goto fail;
605         }
606
607         /* make sure we know about any file expansions already done by
608            anyone else */
609         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
610         tdb->transaction->old_map_size = tdb->map_size;
611
612         /* finally hook the io methods, replacing them with
613            transaction specific methods */
614         tdb->transaction->io_methods = tdb->methods;
615         tdb->methods = &transaction_methods;
616
617         /* Trace at the end, so we get sequence number correct. */
618         tdb_trace(tdb, "tdb_transaction_start");
619         return 0;
620         
621 fail:
622         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
623         tdb_transaction_unlock(tdb);
624         SAFE_FREE(tdb->transaction->blocks);
625         SAFE_FREE(tdb->transaction->hash_heads);
626         SAFE_FREE(tdb->transaction);
627         return -1;
628 }
629
630
631 /*
632   cancel the current transaction
633 */
634 int tdb_transaction_cancel(struct tdb_context *tdb)
635 {       
636         tdb_trace(tdb, "tdb_transaction_cancel");
637         return tdb_transaction_cancel_internal(tdb);
638 }
639
640 /*
641   work out how much space the linearised recovery data will consume
642 */
643 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
644 {
645         tdb_len_t recovery_size = 0;
646         int i;
647
648         recovery_size = sizeof(uint32_t);
649         for (i=0;i<tdb->transaction->num_blocks;i++) {
650                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
651                         break;
652                 }
653                 if (tdb->transaction->blocks[i] == NULL) {
654                         continue;
655                 }
656                 recovery_size += 2*sizeof(tdb_off_t);
657                 if (i == tdb->transaction->num_blocks-1) {
658                         recovery_size += tdb->transaction->last_block_size;
659                 } else {
660                         recovery_size += tdb->transaction->block_size;
661                 }
662         }       
663
664         return recovery_size;
665 }
666
667 /*
668   allocate the recovery area, or use an existing recovery area if it is
669   large enough
670 */
671 static int tdb_recovery_allocate(struct tdb_context *tdb, 
672                                  tdb_len_t *recovery_size,
673                                  tdb_off_t *recovery_offset,
674                                  tdb_len_t *recovery_max_size)
675 {
676         struct list_struct rec;
677         const struct tdb_methods *methods = tdb->transaction->io_methods;
678         tdb_off_t recovery_head;
679
680         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
681                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
682                 return -1;
683         }
684
685         rec.rec_len = 0;
686
687         if (recovery_head != 0 && 
688             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
689                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
690                 return -1;
691         }
692
693         *recovery_size = tdb_recovery_size(tdb);
694
695         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
696                 /* it fits in the existing area */
697                 *recovery_max_size = rec.rec_len;
698                 *recovery_offset = recovery_head;
699                 return 0;
700         }
701
702         /* we need to free up the old recovery area, then allocate a
703            new one at the end of the file. Note that we cannot use
704            tdb_allocate() to allocate the new one as that might return
705            us an area that is being currently used (as of the start of
706            the transaction) */
707         if (recovery_head != 0) {
708                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
709                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
710                         return -1;
711                 }
712         }
713
714         /* the tdb_free() call might have increased the recovery size */
715         *recovery_size = tdb_recovery_size(tdb);
716
717         /* round up to a multiple of page size */
718         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
719         *recovery_offset = tdb->map_size;
720         recovery_head = *recovery_offset;
721
722         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
723                                      (tdb->map_size - tdb->transaction->old_map_size) +
724                                      sizeof(rec) + *recovery_max_size) == -1) {
725                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
726                 return -1;
727         }
728
729         /* remap the file (if using mmap) */
730         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
731
732         /* we have to reset the old map size so that we don't try to expand the file
733            again in the transaction commit, which would destroy the recovery area */
734         tdb->transaction->old_map_size = tdb->map_size;
735
736         /* write the recovery header offset and sync - we can sync without a race here
737            as the magic ptr in the recovery record has not been set */
738         CONVERT(recovery_head);
739         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
740                                &recovery_head, sizeof(tdb_off_t)) == -1) {
741                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
742                 return -1;
743         }
744         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
745                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
746                 return -1;
747         }
748
749         return 0;
750 }
751
752
753 /*
754   setup the recovery data that will be used on a crash during commit
755 */
756 static int transaction_setup_recovery(struct tdb_context *tdb, 
757                                       tdb_off_t *magic_offset)
758 {
759         tdb_len_t recovery_size;
760         unsigned char *data, *p;
761         const struct tdb_methods *methods = tdb->transaction->io_methods;
762         struct list_struct *rec;
763         tdb_off_t recovery_offset, recovery_max_size;
764         tdb_off_t old_map_size = tdb->transaction->old_map_size;
765         uint32_t magic, tailer;
766         int i;
767
768         /*
769           check that the recovery area has enough space
770         */
771         if (tdb_recovery_allocate(tdb, &recovery_size, 
772                                   &recovery_offset, &recovery_max_size) == -1) {
773                 return -1;
774         }
775
776         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
777         if (data == NULL) {
778                 tdb->ecode = TDB_ERR_OOM;
779                 return -1;
780         }
781
782         rec = (struct list_struct *)data;
783         memset(rec, 0, sizeof(*rec));
784
785         rec->magic    = 0;
786         rec->data_len = recovery_size;
787         rec->rec_len  = recovery_max_size;
788         rec->key_len  = old_map_size;
789         CONVERT(rec);
790
791         /* build the recovery data into a single blob to allow us to do a single
792            large write, which should be more efficient */
793         p = data + sizeof(*rec);
794         for (i=0;i<tdb->transaction->num_blocks;i++) {
795                 tdb_off_t offset;
796                 tdb_len_t length;
797
798                 if (tdb->transaction->blocks[i] == NULL) {
799                         continue;
800                 }
801
802                 offset = i * tdb->transaction->block_size;
803                 length = tdb->transaction->block_size;
804                 if (i == tdb->transaction->num_blocks-1) {
805                         length = tdb->transaction->last_block_size;
806                 }
807                 
808                 if (offset >= old_map_size) {
809                         continue;
810                 }
811                 if (offset + length > tdb->transaction->old_map_size) {
812                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
813                         free(data);
814                         tdb->ecode = TDB_ERR_CORRUPT;
815                         return -1;
816                 }
817                 memcpy(p, &offset, 4);
818                 memcpy(p+4, &length, 4);
819                 if (DOCONV()) {
820                         tdb_convert(p, 8);
821                 }
822                 /* the recovery area contains the old data, not the
823                    new data, so we have to call the original tdb_read
824                    method to get it */
825                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
826                         free(data);
827                         tdb->ecode = TDB_ERR_IO;
828                         return -1;
829                 }
830                 p += 8 + length;
831         }
832
833         /* and the tailer */
834         tailer = sizeof(*rec) + recovery_max_size;
835         memcpy(p, &tailer, 4);
836         CONVERT(p);
837
838         /* write the recovery data to the recovery area */
839         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
840                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
841                 free(data);
842                 tdb->ecode = TDB_ERR_IO;
843                 return -1;
844         }
845         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
846                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
847                 free(data);
848                 tdb->ecode = TDB_ERR_IO;
849                 return -1;
850         }
851
852         /* as we don't have ordered writes, we have to sync the recovery
853            data before we update the magic to indicate that the recovery
854            data is present */
855         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
856                 free(data);
857                 return -1;
858         }
859
860         free(data);
861
862         magic = TDB_RECOVERY_MAGIC;
863         CONVERT(magic);
864
865         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
866
867         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
868                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
869                 tdb->ecode = TDB_ERR_IO;
870                 return -1;
871         }
872         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
873                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
874                 tdb->ecode = TDB_ERR_IO;
875                 return -1;
876         }
877
878         /* ensure the recovery magic marker is on disk */
879         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
880                 return -1;
881         }
882
883         return 0;
884 }
885
886 static int tdb_transaction_prepare_commit_internal(struct tdb_context *tdb)
887 {       
888         const struct tdb_methods *methods;
889
890         if (tdb->transaction == NULL) {
891                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
892                 return -1;
893         }
894
895         if (tdb->transaction->prepared) {
896                 tdb->ecode = TDB_ERR_EINVAL;
897                 tdb_transaction_cancel(tdb);
898                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
899                 return -1;
900         }
901
902         if (tdb->transaction->transaction_error) {
903                 tdb->ecode = TDB_ERR_IO;
904                 tdb_transaction_cancel_internal(tdb);
905                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
906                 return -1;
907         }
908
909
910         if (tdb->transaction->nesting != 0) {
911                 tdb->transaction->nesting--;
912                 return 0;
913         }               
914
915 #ifdef TDB_TRACE
916         /* store seqnum now, before reading becomes illegal. */
917         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &tdb->transaction_prepare_seqnum);
918 #endif
919
920         /* check for a null transaction */
921         if (tdb->transaction->blocks == NULL) {
922                 return 0;
923         }
924
925         methods = tdb->transaction->io_methods;
926         
927         /* if there are any locks pending then the caller has not
928            nested their locks properly, so fail the transaction */
929         if (tdb->num_locks || tdb->global_lock.count) {
930                 tdb->ecode = TDB_ERR_LOCK;
931                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
932                 tdb_transaction_cancel_internal(tdb);
933                 return -1;
934         }
935
936         /* upgrade the main transaction lock region to a write lock */
937         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
938                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
939                 tdb->ecode = TDB_ERR_LOCK;
940                 tdb_transaction_cancel_internal(tdb);
941                 return -1;
942         }
943
944         /* get the global lock - this prevents new users attaching to the database
945            during the commit */
946         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
947                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
948                 tdb->ecode = TDB_ERR_LOCK;
949                 tdb_transaction_cancel_internal(tdb);
950                 return -1;
951         }
952
953         if (!(tdb->flags & TDB_NOSYNC)) {
954                 /* write the recovery data to the end of the file */
955                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
956                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
957                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
958                         tdb_transaction_cancel_internal(tdb);
959                         return -1;
960                 }
961         }
962
963         tdb->transaction->prepared = true;
964
965         /* expand the file to the new size if needed */
966         if (tdb->map_size != tdb->transaction->old_map_size) {
967                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
968                                              tdb->map_size - 
969                                              tdb->transaction->old_map_size) == -1) {
970                         tdb->ecode = TDB_ERR_IO;
971                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
972                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
973                         tdb_transaction_cancel_internal(tdb);
974                         return -1;
975                 }
976                 tdb->map_size = tdb->transaction->old_map_size;
977                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
978         }
979
980         /* Keep the global lock until the actual commit */
981
982         return 0;
983 }
984
985 /*
986    prepare to commit the current transaction
987 */
988 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
989 {       
990         tdb_trace(tdb, "tdb_transaction_prepare_commit");
991         return tdb_transaction_prepare_commit_internal(tdb);
992 }
993
994 /*
995   commit the current transaction
996 */
997 int tdb_transaction_commit(struct tdb_context *tdb)
998 {       
999         const struct tdb_methods *methods;
1000         int i;
1001         bool need_repack;
1002
1003         if (tdb->transaction == NULL) {
1004                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1005                 return -1;
1006         }
1007
1008         /* If we've prepared, can't read seqnum. */
1009         if (tdb->transaction->prepared) {
1010                 tdb_trace_seqnum(tdb, tdb->transaction_prepare_seqnum,
1011                                  "tdb_transaction_commit");
1012         } else {
1013                 tdb_trace(tdb, "tdb_transaction_commit");
1014         }
1015
1016         if (tdb->transaction->transaction_error) {
1017                 tdb->ecode = TDB_ERR_IO;
1018                 tdb_transaction_cancel(tdb);
1019                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1020                 return -1;
1021         }
1022
1023
1024         if (tdb->transaction->nesting != 0) {
1025                 tdb->transaction->nesting--;
1026                 return 0;
1027         }
1028
1029         /* check for a null transaction */
1030         if (tdb->transaction->blocks == NULL) {
1031                 tdb_transaction_cancel_internal(tdb);
1032                 return 0;
1033         }
1034
1035         if (!tdb->transaction->prepared) {
1036                 int ret = tdb_transaction_prepare_commit_internal(tdb);
1037                 if (ret)
1038                         return ret;
1039         }
1040
1041         methods = tdb->transaction->io_methods;
1042
1043         /* perform all the writes */
1044         for (i=0;i<tdb->transaction->num_blocks;i++) {
1045                 tdb_off_t offset;
1046                 tdb_len_t length;
1047
1048                 if (tdb->transaction->blocks[i] == NULL) {
1049                         continue;
1050                 }
1051
1052                 offset = i * tdb->transaction->block_size;
1053                 length = tdb->transaction->block_size;
1054                 if (i == tdb->transaction->num_blocks-1) {
1055                         length = tdb->transaction->last_block_size;
1056                 }
1057
1058                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1059                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1060                         
1061                         /* we've overwritten part of the data and
1062                            possibly expanded the file, so we need to
1063                            run the crash recovery code */
1064                         tdb->methods = methods;
1065                         tdb_transaction_recover(tdb); 
1066
1067                         tdb_transaction_cancel_internal(tdb);
1068                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1069
1070                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1071                         return -1;
1072                 }
1073                 SAFE_FREE(tdb->transaction->blocks[i]);
1074         } 
1075
1076         SAFE_FREE(tdb->transaction->blocks);
1077         tdb->transaction->num_blocks = 0;
1078
1079         /* ensure the new data is on disk */
1080         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1081                 return -1;
1082         }
1083
1084         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1085
1086         /*
1087           TODO: maybe write to some dummy hdr field, or write to magic
1088           offset without mmap, before the last sync, instead of the
1089           utime() call
1090         */
1091
1092         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1093            don't change the mtime of the file, this means the file may
1094            not be backed up (as tdb rounding to block sizes means that
1095            file size changes are quite rare too). The following forces
1096            mtime changes when a transaction completes */
1097 #if HAVE_UTIME
1098         utime(tdb->name, NULL);
1099 #endif
1100
1101         need_repack = tdb->transaction->need_repack;
1102
1103         /* use a transaction cancel to free memory and remove the
1104            transaction locks */
1105         tdb_transaction_cancel_internal(tdb);
1106
1107         if (need_repack) {
1108                 return tdb_repack(tdb);
1109         }
1110
1111         return 0;
1112 }
1113
1114
1115 /*
1116   recover from an aborted transaction. Must be called with exclusive
1117   database write access already established (including the global
1118   lock to prevent new processes attaching)
1119 */
1120 int tdb_transaction_recover(struct tdb_context *tdb)
1121 {
1122         tdb_off_t recovery_head, recovery_eof;
1123         unsigned char *data, *p;
1124         uint32_t zero = 0;
1125         struct list_struct rec;
1126
1127         /* find the recovery area */
1128         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1129                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1130                 tdb->ecode = TDB_ERR_IO;
1131                 return -1;
1132         }
1133
1134         if (recovery_head == 0) {
1135                 /* we have never allocated a recovery record */
1136                 return 0;
1137         }
1138
1139         /* read the recovery record */
1140         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1141                                    sizeof(rec), DOCONV()) == -1) {
1142                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1143                 tdb->ecode = TDB_ERR_IO;
1144                 return -1;
1145         }
1146
1147         if (rec.magic != TDB_RECOVERY_MAGIC) {
1148                 /* there is no valid recovery data */
1149                 return 0;
1150         }
1151
1152         if (tdb->read_only) {
1153                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1154                 tdb->ecode = TDB_ERR_CORRUPT;
1155                 return -1;
1156         }
1157
1158         recovery_eof = rec.key_len;
1159
1160         data = (unsigned char *)malloc(rec.data_len);
1161         if (data == NULL) {
1162                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1163                 tdb->ecode = TDB_ERR_OOM;
1164                 return -1;
1165         }
1166
1167         /* read the full recovery data */
1168         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1169                                    rec.data_len, 0) == -1) {
1170                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1171                 tdb->ecode = TDB_ERR_IO;
1172                 return -1;
1173         }
1174
1175         /* recover the file data */
1176         p = data;
1177         while (p+8 < data + rec.data_len) {
1178                 uint32_t ofs, len;
1179                 if (DOCONV()) {
1180                         tdb_convert(p, 8);
1181                 }
1182                 memcpy(&ofs, p, 4);
1183                 memcpy(&len, p+4, 4);
1184
1185                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1186                         free(data);
1187                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1188                         tdb->ecode = TDB_ERR_IO;
1189                         return -1;
1190                 }
1191                 p += 8 + len;
1192         }
1193
1194         free(data);
1195
1196         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1197                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1198                 tdb->ecode = TDB_ERR_IO;
1199                 return -1;
1200         }
1201
1202         /* if the recovery area is after the recovered eof then remove it */
1203         if (recovery_eof <= recovery_head) {
1204                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1205                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1206                         tdb->ecode = TDB_ERR_IO;
1207                         return -1;                      
1208                 }
1209         }
1210
1211         /* remove the recovery magic */
1212         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1213                           &zero) == -1) {
1214                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1215                 tdb->ecode = TDB_ERR_IO;
1216                 return -1;                      
1217         }
1218         
1219         /* reduce the file size to the old size */
1220         tdb_munmap(tdb);
1221         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1222                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1223                 tdb->ecode = TDB_ERR_IO;
1224                 return -1;                      
1225         }
1226         tdb->map_size = recovery_eof;
1227         tdb_mmap(tdb);
1228
1229         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1230                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1231                 tdb->ecode = TDB_ERR_IO;
1232                 return -1;
1233         }
1234
1235         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1236                  recovery_eof));
1237
1238         /* all done */
1239         return 0;
1240 }