]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
tdb2: use magic freetable value rather than different magic for coalescing
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in posix locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is cancelled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or cancelled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129                             tdb_len_t len)
130 {
131         size_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % getpagesize()) > getpagesize()) {
135                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
136                 if (transaction_read(tdb, off, buf, len2) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / getpagesize();
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->read(tdb, off, buf, len) != 0) {
155                         goto fail;
156                 }
157                 return 0;
158         }
159
160         /* it is in the block list. Now check for the last block */
161         if (blk == tdb->transaction->num_blocks-1) {
162                 if (len > tdb->transaction->last_block_size) {
163                         goto fail;
164                 }
165         }
166
167         /* now copy it out of this block */
168         memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len);
169         return 0;
170
171 fail:
172         tdb_logerr(tdb, TDB_ERR_IO, TDB_DEBUG_FATAL,
173                    "transaction_read: failed at off=%zu len=%zu",
174                    (size_t)off, (size_t)len);
175         tdb->transaction->transaction_error = 1;
176         return -1;
177 }
178
179
180 /*
181   write while in a transaction
182 */
183 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
184                              const void *buf, tdb_len_t len)
185 {
186         size_t blk;
187
188         /* Only a commit is allowed on a prepared transaction */
189         if (tdb->transaction->prepared) {
190                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_FATAL,
191                          "transaction_write: transaction already prepared,"
192                          " write not allowed");
193                 goto fail;
194         }
195
196         /* break it up into block sized chunks */
197         while (len + (off % getpagesize()) > getpagesize()) {
198                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
199                 if (transaction_write(tdb, off, buf, len2) != 0) {
200                         return -1;
201                 }
202                 len -= len2;
203                 off += len2;
204                 if (buf != NULL) {
205                         buf = (const void *)(len2 + (const char *)buf);
206                 }
207         }
208
209         if (len == 0) {
210                 return 0;
211         }
212
213         blk = off / getpagesize();
214         off = off % getpagesize();
215
216         if (tdb->transaction->num_blocks <= blk) {
217                 uint8_t **new_blocks;
218                 /* expand the blocks array */
219                 if (tdb->transaction->blocks == NULL) {
220                         new_blocks = (uint8_t **)malloc(
221                                 (blk+1)*sizeof(uint8_t *));
222                 } else {
223                         new_blocks = (uint8_t **)realloc(
224                                 tdb->transaction->blocks,
225                                 (blk+1)*sizeof(uint8_t *));
226                 }
227                 if (new_blocks == NULL) {
228                         tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_FATAL,
229                                    "transaction_write: failed to allocate");
230                         goto fail;
231                 }
232                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
233                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
234                 tdb->transaction->blocks = new_blocks;
235                 tdb->transaction->num_blocks = blk+1;
236                 tdb->transaction->last_block_size = 0;
237         }
238
239         /* allocate and fill a block? */
240         if (tdb->transaction->blocks[blk] == NULL) {
241                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1);
242                 if (tdb->transaction->blocks[blk] == NULL) {
243                         tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_FATAL,
244                                    "transaction_write: failed to allocate");
245                         goto fail;
246                 }
247                 if (tdb->transaction->old_map_size > blk * getpagesize()) {
248                         tdb_len_t len2 = getpagesize();
249                         if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) {
250                                 len2 = tdb->transaction->old_map_size - (blk * getpagesize());
251                         }
252                         if (tdb->transaction->io_methods->read(tdb, blk * getpagesize(),
253                                                                tdb->transaction->blocks[blk],
254                                                                len2) != 0) {
255                                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_FATAL,
256                                            "transaction_write: failed to"
257                                            " read old block: %s",
258                                            strerror(errno));
259                                 SAFE_FREE(tdb->transaction->blocks[blk]);
260                                 goto fail;
261                         }
262                         if (blk == tdb->transaction->num_blocks-1) {
263                                 tdb->transaction->last_block_size = len2;
264                         }
265                 }
266         }
267
268         /* overwrite part of an existing block */
269         if (buf == NULL) {
270                 memset(tdb->transaction->blocks[blk] + off, 0, len);
271         } else {
272                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
273         }
274         if (blk == tdb->transaction->num_blocks-1) {
275                 if (len + off > tdb->transaction->last_block_size) {
276                         tdb->transaction->last_block_size = len + off;
277                 }
278         }
279
280         return 0;
281
282 fail:
283         tdb->transaction->transaction_error = 1;
284         return -1;
285 }
286
287
288 /*
289   write while in a transaction - this varient never expands the transaction blocks, it only
290   updates existing blocks. This means it cannot change the recovery size
291 */
292 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
293                                        const void *buf, tdb_len_t len)
294 {
295         size_t blk;
296
297         /* break it up into block sized chunks */
298         while (len + (off % getpagesize()) > getpagesize()) {
299                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
300                 transaction_write_existing(tdb, off, buf, len2);
301                 len -= len2;
302                 off += len2;
303                 if (buf != NULL) {
304                         buf = (const void *)(len2 + (const char *)buf);
305                 }
306         }
307
308         if (len == 0) {
309                 return;
310         }
311
312         blk = off / getpagesize();
313         off = off % getpagesize();
314
315         if (tdb->transaction->num_blocks <= blk ||
316             tdb->transaction->blocks[blk] == NULL) {
317                 return;
318         }
319
320         if (blk == tdb->transaction->num_blocks-1 &&
321             off + len > tdb->transaction->last_block_size) {
322                 if (off >= tdb->transaction->last_block_size) {
323                         return;
324                 }
325                 len = tdb->transaction->last_block_size - off;
326         }
327
328         /* overwrite part of an existing block */
329         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
330 }
331
332
333 /*
334   out of bounds check during a transaction
335 */
336 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
337 {
338         if (len <= tdb->map_size) {
339                 return 0;
340         }
341         tdb->ecode = TDB_ERR_IO;
342         if (!probe) {
343                 tdb_logerr(tdb, TDB_ERR_IO, TDB_DEBUG_FATAL,
344                            "tdb_oob len %lld beyond transaction size %lld",
345                            (long long)len,
346                            (long long)tdb->map_size);
347         }
348         return -1;
349 }
350
351 /*
352   transaction version of tdb_expand().
353 */
354 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t addition)
355 {
356         /* add a write to the transaction elements, so subsequent
357            reads see the zero data */
358         if (transaction_write(tdb, tdb->map_size, NULL, addition) != 0) {
359                 return -1;
360         }
361         tdb->map_size += addition;
362         return 0;
363 }
364
365 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
366                                 size_t len)
367 {
368         /* FIXME */
369         return NULL;
370 }
371
372 static const struct tdb_methods transaction_methods = {
373         transaction_read,
374         transaction_write,
375         transaction_oob,
376         transaction_expand_file,
377         transaction_direct,
378 };
379
380 /*
381   sync to disk
382 */
383 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
384 {
385         if (tdb->flags & TDB_NOSYNC) {
386                 return 0;
387         }
388
389         if (fsync(tdb->fd) != 0) {
390                 tdb_logerr(tdb, TDB_ERR_IO, TDB_DEBUG_FATAL,
391                            "tdb_transaction: fsync failed: %s",
392                            strerror(errno));
393                 return -1;
394         }
395 #ifdef MS_SYNC
396         if (tdb->map_ptr) {
397                 tdb_off_t moffset = offset & ~(getpagesize()-1);
398                 if (msync(moffset + (char *)tdb->map_ptr,
399                           length + (offset - moffset), MS_SYNC) != 0) {
400                         tdb_logerr(tdb, TDB_ERR_IO, TDB_DEBUG_FATAL,
401                                    "tdb_transaction: msync failed: %s",
402                                    strerror(errno));
403                         return -1;
404                 }
405         }
406 #endif
407         return 0;
408 }
409
410
411 static void _tdb_transaction_cancel(struct tdb_context *tdb)
412 {
413         int i;
414
415         if (tdb->transaction == NULL) {
416                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
417                            "tdb_transaction_cancel: no transaction");
418                 return;
419         }
420
421         if (tdb->transaction->nesting != 0) {
422                 tdb->transaction->transaction_error = 1;
423                 tdb->transaction->nesting--;
424                 return;
425         }
426
427         tdb->map_size = tdb->transaction->old_map_size;
428
429         /* free all the transaction blocks */
430         for (i=0;i<tdb->transaction->num_blocks;i++) {
431                 if (tdb->transaction->blocks[i] != NULL) {
432                         free(tdb->transaction->blocks[i]);
433                 }
434         }
435         SAFE_FREE(tdb->transaction->blocks);
436
437         if (tdb->transaction->magic_offset) {
438                 const struct tdb_methods *methods = tdb->transaction->io_methods;
439                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
440
441                 /* remove the recovery marker */
442                 if (methods->write(tdb, tdb->transaction->magic_offset,
443                                    &invalid, sizeof(invalid)) == -1 ||
444                     transaction_sync(tdb, tdb->transaction->magic_offset,
445                                      sizeof(invalid)) == -1) {
446                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
447                                    "tdb_transaction_cancel: failed to remove"
448                                    " recovery magic");
449                 }
450         }
451
452         if (tdb->allrecord_lock.count)
453                 tdb_allrecord_unlock(tdb, tdb->allrecord_lock.ltype);
454
455         /* restore the normal io methods */
456         tdb->methods = tdb->transaction->io_methods;
457
458         tdb_transaction_unlock(tdb, F_WRLCK);
459
460         if (tdb_has_open_lock(tdb))
461                 tdb_unlock_open(tdb);
462
463         SAFE_FREE(tdb->transaction);
464 }
465
466 /*
467   start a tdb transaction. No token is returned, as only a single
468   transaction is allowed to be pending per tdb_context
469 */
470 int tdb_transaction_start(struct tdb_context *tdb)
471 {
472         /* some sanity checks */
473         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
474                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
475                            "tdb_transaction_start: cannot start a transaction"
476                            " on a read-only or internal db");
477                 return -1;
478         }
479
480         /* cope with nested tdb_transaction_start() calls */
481         if (tdb->transaction != NULL) {
482                 tdb_logerr(tdb, TDB_ERR_NESTING, TDB_DEBUG_ERROR,
483                            "tdb_transaction_start:"
484                            " already inside transaction");
485                 return -1;
486         }
487
488         if (tdb_has_hash_locks(tdb)) {
489                 /* the caller must not have any locks when starting a
490                    transaction as otherwise we'll be screwed by lack
491                    of nested locks in posix */
492                 tdb_logerr(tdb, TDB_ERR_LOCK, TDB_DEBUG_ERROR,
493                            "tdb_transaction_start: cannot start a transaction"
494                            " with locks held");
495                 return -1;
496         }
497
498         tdb->transaction = (struct tdb_transaction *)
499                 calloc(sizeof(struct tdb_transaction), 1);
500         if (tdb->transaction == NULL) {
501                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_ERROR,
502                            "tdb_transaction_start: cannot allocate");
503                 return -1;
504         }
505
506         /* get the transaction write lock. This is a blocking lock. As
507            discussed with Volker, there are a number of ways we could
508            make this async, which we will probably do in the future */
509         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
510                 SAFE_FREE(tdb->transaction->blocks);
511                 SAFE_FREE(tdb->transaction);
512                 return -1;
513         }
514
515         /* get a read lock over entire file. This is upgraded to a write
516            lock during the commit */
517         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
518                 goto fail_allrecord_lock;
519         }
520
521         /* make sure we know about any file expansions already done by
522            anyone else */
523         tdb->methods->oob(tdb, tdb->map_size + 1, true);
524         tdb->transaction->old_map_size = tdb->map_size;
525
526         /* finally hook the io methods, replacing them with
527            transaction specific methods */
528         tdb->transaction->io_methods = tdb->methods;
529         tdb->methods = &transaction_methods;
530         return 0;
531
532 fail_allrecord_lock:
533         tdb_transaction_unlock(tdb, F_WRLCK);
534         SAFE_FREE(tdb->transaction->blocks);
535         SAFE_FREE(tdb->transaction);
536         return -1;
537 }
538
539
540 /*
541   cancel the current transaction
542 */
543 void tdb_transaction_cancel(struct tdb_context *tdb)
544 {
545         _tdb_transaction_cancel(tdb);
546 }
547
548 /*
549   work out how much space the linearised recovery data will consume
550 */
551 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
552 {
553         tdb_len_t recovery_size = 0;
554         int i;
555
556         recovery_size = sizeof(tdb_len_t);
557         for (i=0;i<tdb->transaction->num_blocks;i++) {
558                 if (i * getpagesize() >= tdb->transaction->old_map_size) {
559                         break;
560                 }
561                 if (tdb->transaction->blocks[i] == NULL) {
562                         continue;
563                 }
564                 recovery_size += 2*sizeof(tdb_off_t);
565                 if (i == tdb->transaction->num_blocks-1) {
566                         recovery_size += tdb->transaction->last_block_size;
567                 } else {
568                         recovery_size += getpagesize();
569                 }
570         }
571
572         return recovery_size;
573 }
574
575 /*
576   allocate the recovery area, or use an existing recovery area if it is
577   large enough
578 */
579 static int tdb_recovery_allocate(struct tdb_context *tdb,
580                                  tdb_len_t *recovery_size,
581                                  tdb_off_t *recovery_offset,
582                                  tdb_len_t *recovery_max_size)
583 {
584         struct tdb_recovery_record rec;
585         const struct tdb_methods *methods = tdb->transaction->io_methods;
586         tdb_off_t recovery_head;
587         size_t addition;
588
589         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
590         if (recovery_head == TDB_OFF_ERR) {
591                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
592                          "tdb_recovery_allocate:"
593                          " failed to read recovery head");
594                 return -1;
595         }
596
597         if (recovery_head != 0) {
598                 if (methods->read(tdb, recovery_head, &rec, sizeof(rec))) {
599                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
600                                  "tdb_recovery_allocate:"
601                                  " failed to read recovery record");
602                         return -1;
603                 }
604                 tdb_convert(tdb, &rec, sizeof(rec));
605                 /* ignore invalid recovery regions: can happen in crash */
606                 if (rec.magic != TDB_RECOVERY_MAGIC &&
607                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
608                         recovery_head = 0;
609                 }
610         }
611
612         *recovery_size = tdb_recovery_size(tdb);
613
614         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
615                 /* it fits in the existing area */
616                 *recovery_max_size = rec.max_len;
617                 *recovery_offset = recovery_head;
618                 return 0;
619         }
620
621         /* we need to free up the old recovery area, then allocate a
622            new one at the end of the file. Note that we cannot use
623            normal allocation to allocate the new one as that might return
624            us an area that is being currently used (as of the start of
625            the transaction) */
626         if (recovery_head != 0) {
627                 add_stat(tdb, frees, 1);
628                 if (add_free_record(tdb, recovery_head,
629                                     sizeof(rec) + rec.max_len) != 0) {
630                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
631                                    "tdb_recovery_allocate:"
632                                    " failed to free previous recovery area");
633                         return -1;
634                 }
635         }
636
637         /* the tdb_free() call might have increased the recovery size */
638         *recovery_size = tdb_recovery_size(tdb);
639
640         /* round up to a multiple of page size */
641         *recovery_max_size
642                 = (((sizeof(rec) + *recovery_size) + getpagesize()-1)
643                    & ~(getpagesize()-1))
644                 - sizeof(rec);
645         *recovery_offset = tdb->map_size;
646         recovery_head = *recovery_offset;
647
648         /* Restore ->map_size before calling underlying expand_file.
649            Also so that we don't try to expand the file again in the
650            transaction commit, which would destroy the recovery
651            area */
652         addition = (tdb->map_size - tdb->transaction->old_map_size) +
653                 sizeof(rec) + *recovery_max_size;
654         tdb->map_size = tdb->transaction->old_map_size;
655         if (methods->expand_file(tdb, addition) == -1) {
656                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
657                          "tdb_recovery_allocate:"
658                          " failed to create recovery area");
659                 return -1;
660         }
661
662         /* we have to reset the old map size so that we don't try to
663            expand the file again in the transaction commit, which
664            would destroy the recovery area */
665         tdb->transaction->old_map_size = tdb->map_size;
666
667         /* write the recovery header offset and sync - we can sync without a race here
668            as the magic ptr in the recovery record has not been set */
669         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
670         if (methods->write(tdb, offsetof(struct tdb_header, recovery),
671                            &recovery_head, sizeof(tdb_off_t)) == -1) {
672                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
673                          "tdb_recovery_allocate:"
674                          " failed to write recovery head");
675                 return -1;
676         }
677         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
678                                    &recovery_head,
679                                    sizeof(tdb_off_t));
680         return 0;
681 }
682
683 /* Set up header for the recovery record. */
684 static void set_recovery_header(struct tdb_recovery_record *rec,
685                                 uint64_t magic,
686                                 uint64_t datalen, uint64_t actuallen,
687                                 uint64_t oldsize)
688 {
689         rec->magic = magic;
690         rec->max_len = actuallen;
691         rec->len = datalen;
692         rec->eof = oldsize;
693 }
694
695 /*
696   setup the recovery data that will be used on a crash during commit
697 */
698 static int transaction_setup_recovery(struct tdb_context *tdb,
699                                       tdb_off_t *magic_offset)
700 {
701         tdb_len_t recovery_size;
702         unsigned char *data, *p;
703         const struct tdb_methods *methods = tdb->transaction->io_methods;
704         struct tdb_recovery_record *rec;
705         tdb_off_t recovery_offset, recovery_max_size;
706         tdb_off_t old_map_size = tdb->transaction->old_map_size;
707         uint64_t magic, tailer;
708         int i;
709
710         /*
711           check that the recovery area has enough space
712         */
713         if (tdb_recovery_allocate(tdb, &recovery_size,
714                                   &recovery_offset, &recovery_max_size) == -1) {
715                 return -1;
716         }
717
718         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
719         if (data == NULL) {
720                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_FATAL,
721                            "transaction_setup_recovery: cannot allocate");
722                 return -1;
723         }
724
725         rec = (struct tdb_recovery_record *)data;
726         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
727                             recovery_size, recovery_max_size, old_map_size);
728         tdb_convert(tdb, rec, sizeof(*rec));
729
730         /* build the recovery data into a single blob to allow us to do a single
731            large write, which should be more efficient */
732         p = data + sizeof(*rec);
733         for (i=0;i<tdb->transaction->num_blocks;i++) {
734                 tdb_off_t offset;
735                 tdb_len_t length;
736
737                 if (tdb->transaction->blocks[i] == NULL) {
738                         continue;
739                 }
740
741                 offset = i * getpagesize();
742                 length = getpagesize();
743                 if (i == tdb->transaction->num_blocks-1) {
744                         length = tdb->transaction->last_block_size;
745                 }
746
747                 if (offset >= old_map_size) {
748                         continue;
749                 }
750                 if (offset + length > tdb->map_size) {
751                         tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_DEBUG_FATAL,
752                                    "tdb_transaction_setup_recovery:"
753                                    " transaction data over new region boundary");
754                         free(data);
755                         return -1;
756                 }
757                 memcpy(p, &offset, sizeof(offset));
758                 memcpy(p + sizeof(offset), &length, sizeof(length));
759                 tdb_convert(tdb, p, sizeof(offset) + sizeof(length));
760
761                 /* the recovery area contains the old data, not the
762                    new data, so we have to call the original tdb_read
763                    method to get it */
764                 if (methods->read(tdb, offset,
765                                   p + sizeof(offset) + sizeof(length),
766                                   length) != 0) {
767                         free(data);
768                         return -1;
769                 }
770                 p += sizeof(offset) + sizeof(length) + length;
771         }
772
773         /* and the tailer */
774         tailer = sizeof(*rec) + recovery_max_size;
775         memcpy(p, &tailer, sizeof(tailer));
776         tdb_convert(tdb, p, sizeof(tailer));
777
778         /* write the recovery data to the recovery area */
779         if (methods->write(tdb, recovery_offset, data,
780                            sizeof(*rec) + recovery_size) == -1) {
781                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
782                          "tdb_transaction_setup_recovery:"
783                          " failed to write recovery data");
784                 free(data);
785                 return -1;
786         }
787         transaction_write_existing(tdb, recovery_offset, data,
788                                    sizeof(*rec) + recovery_size);
789
790         /* as we don't have ordered writes, we have to sync the recovery
791            data before we update the magic to indicate that the recovery
792            data is present */
793         if (transaction_sync(tdb, recovery_offset,
794                              sizeof(*rec) + recovery_size) == -1) {
795                 free(data);
796                 return -1;
797         }
798
799         free(data);
800
801         magic = TDB_RECOVERY_MAGIC;
802         tdb_convert(tdb, &magic, sizeof(magic));
803
804         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
805                                                    magic);
806
807         if (methods->write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
808                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
809                          "tdb_transaction_setup_recovery:"
810                          " failed to write recovery magic");
811                 return -1;
812         }
813         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
814
815         /* ensure the recovery magic marker is on disk */
816         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
817                 return -1;
818         }
819
820         return 0;
821 }
822
823 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
824 {
825         const struct tdb_methods *methods;
826
827         if (tdb->transaction == NULL) {
828                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
829                            "tdb_transaction_prepare_commit: no transaction");
830                 return -1;
831         }
832
833         if (tdb->transaction->prepared) {
834                 _tdb_transaction_cancel(tdb);
835                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
836                            "tdb_transaction_prepare_commit:"
837                            " transaction already prepared");
838                 return -1;
839         }
840
841         if (tdb->transaction->transaction_error) {
842                 _tdb_transaction_cancel(tdb);
843                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
844                            "tdb_transaction_prepare_commit:"
845                            " transaction error pending");
846                 return -1;
847         }
848
849
850         if (tdb->transaction->nesting != 0) {
851                 tdb->transaction->nesting--;
852                 return 0;
853         }
854
855         /* check for a null transaction */
856         if (tdb->transaction->blocks == NULL) {
857                 return 0;
858         }
859
860         methods = tdb->transaction->io_methods;
861
862         /* upgrade the main transaction lock region to a write lock */
863         if (tdb_allrecord_upgrade(tdb) == -1) {
864                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_ERROR,
865                          "tdb_transaction_prepare_commit:"
866                          " failed to upgrade hash locks");
867                 _tdb_transaction_cancel(tdb);
868                 return -1;
869         }
870
871         /* get the open lock - this prevents new users attaching to the database
872            during the commit */
873         if (tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK) == -1) {
874                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_ERROR,
875                          "tdb_transaction_prepare_commit:"
876                          " failed to get open lock");
877                 _tdb_transaction_cancel(tdb);
878                 return -1;
879         }
880
881         /* Since we have whole db locked, we don't need the expansion lock. */
882         if (!(tdb->flags & TDB_NOSYNC)) {
883                 /* write the recovery data to the end of the file */
884                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
885                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
886                                  "tdb_transaction_prepare_commit:"
887                                  " failed to setup recovery data");
888                         _tdb_transaction_cancel(tdb);
889                         return -1;
890                 }
891         }
892
893         tdb->transaction->prepared = true;
894
895         /* expand the file to the new size if needed */
896         if (tdb->map_size != tdb->transaction->old_map_size) {
897                 tdb_len_t add = tdb->map_size - tdb->transaction->old_map_size;
898                 /* Restore original map size for tdb_expand_file */
899                 tdb->map_size = tdb->transaction->old_map_size;
900                 if (methods->expand_file(tdb, add) == -1) {
901                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_ERROR,
902                                  "tdb_transaction_prepare_commit:"
903                                  " expansion failed");
904                         _tdb_transaction_cancel(tdb);
905                         return -1;
906                 }
907         }
908
909         /* Keep the open lock until the actual commit */
910
911         return 0;
912 }
913
914 /*
915    prepare to commit the current transaction
916 */
917 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
918 {
919         return _tdb_transaction_prepare_commit(tdb);
920 }
921
922 /*
923   commit the current transaction
924 */
925 int tdb_transaction_commit(struct tdb_context *tdb)
926 {
927         const struct tdb_methods *methods;
928         int i;
929
930         if (tdb->transaction == NULL) {
931                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_DEBUG_ERROR,
932                          "tdb_transaction_commit: no transaction");
933                 return -1;
934         }
935
936         tdb_trace(tdb, "tdb_transaction_commit");
937
938         if (tdb->transaction->transaction_error) {
939                 tdb_transaction_cancel(tdb);
940                 tdb_logerr(tdb, TDB_ERR_IO, TDB_DEBUG_ERROR,
941                            "tdb_transaction_commit:"
942                            " transaction error pending");
943                 return -1;
944         }
945
946
947         if (tdb->transaction->nesting != 0) {
948                 tdb->transaction->nesting--;
949                 return 0;
950         }
951
952         /* check for a null transaction */
953         if (tdb->transaction->blocks == NULL) {
954                 _tdb_transaction_cancel(tdb);
955                 return 0;
956         }
957
958         if (!tdb->transaction->prepared) {
959                 int ret = _tdb_transaction_prepare_commit(tdb);
960                 if (ret)
961                         return ret;
962         }
963
964         methods = tdb->transaction->io_methods;
965
966         /* perform all the writes */
967         for (i=0;i<tdb->transaction->num_blocks;i++) {
968                 tdb_off_t offset;
969                 tdb_len_t length;
970
971                 if (tdb->transaction->blocks[i] == NULL) {
972                         continue;
973                 }
974
975                 offset = i * getpagesize();
976                 length = getpagesize();
977                 if (i == tdb->transaction->num_blocks-1) {
978                         length = tdb->transaction->last_block_size;
979                 }
980
981                 if (methods->write(tdb, offset, tdb->transaction->blocks[i],
982                                    length) == -1) {
983                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
984                                    "tdb_transaction_commit:"
985                                    " write failed during commit");
986
987                         /* we've overwritten part of the data and
988                            possibly expanded the file, so we need to
989                            run the crash recovery code */
990                         tdb->methods = methods;
991                         tdb_transaction_recover(tdb);
992
993                         _tdb_transaction_cancel(tdb);
994
995                         return -1;
996                 }
997                 SAFE_FREE(tdb->transaction->blocks[i]);
998         }
999
1000         SAFE_FREE(tdb->transaction->blocks);
1001         tdb->transaction->num_blocks = 0;
1002
1003         /* ensure the new data is on disk */
1004         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1005                 return -1;
1006         }
1007
1008         /*
1009           TODO: maybe write to some dummy hdr field, or write to magic
1010           offset without mmap, before the last sync, instead of the
1011           utime() call
1012         */
1013
1014         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1015            don't change the mtime of the file, this means the file may
1016            not be backed up (as tdb rounding to block sizes means that
1017            file size changes are quite rare too). The following forces
1018            mtime changes when a transaction completes */
1019 #if HAVE_UTIME
1020         utime(tdb->name, NULL);
1021 #endif
1022
1023         /* use a transaction cancel to free memory and remove the
1024            transaction locks */
1025         _tdb_transaction_cancel(tdb);
1026
1027         return 0;
1028 }
1029
1030
1031 /*
1032   recover from an aborted transaction. Must be called with exclusive
1033   database write access already established (including the open
1034   lock to prevent new processes attaching)
1035 */
1036 int tdb_transaction_recover(struct tdb_context *tdb)
1037 {
1038         tdb_off_t recovery_head, recovery_eof;
1039         unsigned char *data, *p;
1040         struct tdb_recovery_record rec;
1041
1042         /* find the recovery area */
1043         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1044         if (recovery_head == TDB_OFF_ERR) {
1045                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1046                          "tdb_transaction_recover:"
1047                          " failed to read recovery head");
1048                 return -1;
1049         }
1050
1051         if (recovery_head == 0) {
1052                 /* we have never allocated a recovery record */
1053                 return 0;
1054         }
1055
1056         /* read the recovery record */
1057         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1058                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1059                            "tdb_transaction_recover:"
1060                            " failed to read recovery record");
1061                 return -1;
1062         }
1063
1064         if (rec.magic != TDB_RECOVERY_MAGIC) {
1065                 /* there is no valid recovery data */
1066                 return 0;
1067         }
1068
1069         if (tdb->read_only) {
1070                 tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_DEBUG_FATAL,
1071                            "tdb_transaction_recover:"
1072                            " attempt to recover read only database");
1073                 return -1;
1074         }
1075
1076         recovery_eof = rec.eof;
1077
1078         data = (unsigned char *)malloc(rec.len);
1079         if (data == NULL) {
1080                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_DEBUG_FATAL,
1081                            "tdb_transaction_recover:"
1082                            " failed to allocate recovery data");
1083                 return -1;
1084         }
1085
1086         /* read the full recovery data */
1087         if (tdb->methods->read(tdb, recovery_head + sizeof(rec), data,
1088                                rec.len) == -1) {
1089                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1090                            "tdb_transaction_recover:"
1091                            " failed to read recovery data");
1092                 return -1;
1093         }
1094
1095         /* recover the file data */
1096         p = data;
1097         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1098                 tdb_off_t ofs;
1099                 tdb_len_t len;
1100                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1101                 memcpy(&ofs, p, sizeof(ofs));
1102                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1103                 p += sizeof(ofs) + sizeof(len);
1104
1105                 if (tdb->methods->write(tdb, ofs, p, len) == -1) {
1106                         free(data);
1107                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1108                                  "tdb_transaction_recover:"
1109                                  " failed to recover %zu bytes at offset %zu",
1110                                  (size_t)len, (size_t)ofs);
1111                         return -1;
1112                 }
1113                 p += len;
1114         }
1115
1116         free(data);
1117
1118         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1119                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1120                            "tdb_transaction_recover: failed to sync recovery");
1121                 return -1;
1122         }
1123
1124         /* if the recovery area is after the recovered eof then remove it */
1125         if (recovery_eof <= recovery_head) {
1126                 if (tdb_write_off(tdb, offsetof(struct tdb_header,recovery), 0)
1127                     == -1) {
1128                         tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1129                                  "tdb_transaction_recover:"
1130                                  " failed to remove recovery head");
1131                         return -1;
1132                 }
1133         }
1134
1135         /* remove the recovery magic */
1136         if (tdb_write_off(tdb,
1137                           recovery_head
1138                           + offsetof(struct tdb_recovery_record, magic),
1139                           TDB_RECOVERY_INVALID_MAGIC) == -1) {
1140                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1141                          "tdb_transaction_recover:"
1142                          " failed to remove recovery magic");
1143                 return -1;
1144         }
1145
1146         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1147                 tdb_logerr(tdb, tdb->ecode, TDB_DEBUG_FATAL,
1148                          "tdb_transaction_recover: failed to sync2 recovery");
1149                 return -1;
1150         }
1151
1152         tdb_logerr(tdb, TDB_SUCCESS, TDB_DEBUG_TRACE,
1153                    "tdb_transaction_recover: recovered %zu byte database",
1154                    (size_t)recovery_eof);
1155
1156         /* all done */
1157         return 0;
1158 }
1159
1160 /* Any I/O failures we say "needs recovery". */
1161 bool tdb_needs_recovery(struct tdb_context *tdb)
1162 {
1163         tdb_off_t recovery_head;
1164         struct tdb_recovery_record rec;
1165
1166         /* find the recovery area */
1167         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1168         if (recovery_head == TDB_OFF_ERR) {
1169                 return true;
1170         }
1171
1172         if (recovery_head == 0) {
1173                 /* we have never allocated a recovery record */
1174                 return false;
1175         }
1176
1177         /* read the recovery record */
1178         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1179                 return true;
1180         }
1181
1182         return (rec.magic == TDB_RECOVERY_MAGIC);
1183 }