]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
tdb2: make tdb_check call check() function.
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in posix locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is cancelled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or cancelled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129                             tdb_len_t len)
130 {
131         size_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % getpagesize()) > getpagesize()) {
135                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
136                 if (transaction_read(tdb, off, buf, len2) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / getpagesize();
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->read(tdb, off, buf, len) != 0) {
155                         goto fail;
156                 }
157                 return 0;
158         }
159
160         /* it is in the block list. Now check for the last block */
161         if (blk == tdb->transaction->num_blocks-1) {
162                 if (len > tdb->transaction->last_block_size) {
163                         goto fail;
164                 }
165         }
166
167         /* now copy it out of this block */
168         memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len);
169         return 0;
170
171 fail:
172         tdb->ecode = TDB_ERR_IO;
173         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
174                  "transaction_read: failed at off=%llu len=%llu\n",
175                  (long long)off, (long long)len);
176         tdb->transaction->transaction_error = 1;
177         return -1;
178 }
179
180
181 /*
182   write while in a transaction
183 */
184 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
185                              const void *buf, tdb_len_t len)
186 {
187         size_t blk;
188
189         /* Only a commit is allowed on a prepared transaction */
190         if (tdb->transaction->prepared) {
191                 tdb->ecode = TDB_ERR_EINVAL;
192                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
193                          "transaction_write: transaction already prepared,"
194                          " write not allowed\n");
195                 tdb->transaction->transaction_error = 1;
196                 return -1;
197         }
198
199         /* break it up into block sized chunks */
200         while (len + (off % getpagesize()) > getpagesize()) {
201                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
202                 if (transaction_write(tdb, off, buf, len2) != 0) {
203                         return -1;
204                 }
205                 len -= len2;
206                 off += len2;
207                 if (buf != NULL) {
208                         buf = (const void *)(len2 + (const char *)buf);
209                 }
210         }
211
212         if (len == 0) {
213                 return 0;
214         }
215
216         blk = off / getpagesize();
217         off = off % getpagesize();
218
219         if (tdb->transaction->num_blocks <= blk) {
220                 uint8_t **new_blocks;
221                 /* expand the blocks array */
222                 if (tdb->transaction->blocks == NULL) {
223                         new_blocks = (uint8_t **)malloc(
224                                 (blk+1)*sizeof(uint8_t *));
225                 } else {
226                         new_blocks = (uint8_t **)realloc(
227                                 tdb->transaction->blocks,
228                                 (blk+1)*sizeof(uint8_t *));
229                 }
230                 if (new_blocks == NULL) {
231                         tdb->ecode = TDB_ERR_OOM;
232                         goto fail;
233                 }
234                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
235                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
236                 tdb->transaction->blocks = new_blocks;
237                 tdb->transaction->num_blocks = blk+1;
238                 tdb->transaction->last_block_size = 0;
239         }
240
241         /* allocate and fill a block? */
242         if (tdb->transaction->blocks[blk] == NULL) {
243                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1);
244                 if (tdb->transaction->blocks[blk] == NULL) {
245                         tdb->ecode = TDB_ERR_OOM;
246                         tdb->transaction->transaction_error = 1;
247                         return -1;
248                 }
249                 if (tdb->transaction->old_map_size > blk * getpagesize()) {
250                         tdb_len_t len2 = getpagesize();
251                         if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) {
252                                 len2 = tdb->transaction->old_map_size - (blk * getpagesize());
253                         }
254                         if (tdb->transaction->io_methods->read(tdb, blk * getpagesize(),
255                                                                tdb->transaction->blocks[blk],
256                                                                len2) != 0) {
257                                 SAFE_FREE(tdb->transaction->blocks[blk]);
258                                 goto fail;
259                         }
260                         if (blk == tdb->transaction->num_blocks-1) {
261                                 tdb->transaction->last_block_size = len2;
262                         }
263                 }
264         }
265
266         /* overwrite part of an existing block */
267         if (buf == NULL) {
268                 memset(tdb->transaction->blocks[blk] + off, 0, len);
269         } else {
270                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
271         }
272         if (blk == tdb->transaction->num_blocks-1) {
273                 if (len + off > tdb->transaction->last_block_size) {
274                         tdb->transaction->last_block_size = len + off;
275                 }
276         }
277
278         return 0;
279
280 fail:
281         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
282                  "transaction_write: failed at off=%llu len=%llu\n",
283                  (long long)((blk*getpagesize()) + off),
284                  (long long)len);
285         tdb->transaction->transaction_error = 1;
286         return -1;
287 }
288
289
290 /*
291   write while in a transaction - this varient never expands the transaction blocks, it only
292   updates existing blocks. This means it cannot change the recovery size
293 */
294 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
295                                        const void *buf, tdb_len_t len)
296 {
297         size_t blk;
298
299         /* break it up into block sized chunks */
300         while (len + (off % getpagesize()) > getpagesize()) {
301                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
302                 transaction_write_existing(tdb, off, buf, len2);
303                 len -= len2;
304                 off += len2;
305                 if (buf != NULL) {
306                         buf = (const void *)(len2 + (const char *)buf);
307                 }
308         }
309
310         if (len == 0) {
311                 return;
312         }
313
314         blk = off / getpagesize();
315         off = off % getpagesize();
316
317         if (tdb->transaction->num_blocks <= blk ||
318             tdb->transaction->blocks[blk] == NULL) {
319                 return;
320         }
321
322         if (blk == tdb->transaction->num_blocks-1 &&
323             off + len > tdb->transaction->last_block_size) {
324                 if (off >= tdb->transaction->last_block_size) {
325                         return;
326                 }
327                 len = tdb->transaction->last_block_size - off;
328         }
329
330         /* overwrite part of an existing block */
331         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
332 }
333
334
335 /*
336   out of bounds check during a transaction
337 */
338 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
339 {
340         if (len <= tdb->map_size) {
341                 return 0;
342         }
343         tdb->ecode = TDB_ERR_IO;
344         return -1;
345 }
346
347 /*
348   transaction version of tdb_expand().
349 */
350 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t addition)
351 {
352         /* add a write to the transaction elements, so subsequent
353            reads see the zero data */
354         if (transaction_write(tdb, tdb->map_size, NULL, addition) != 0) {
355                 return -1;
356         }
357         tdb->map_size += addition;
358         return 0;
359 }
360
361 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
362                                 size_t len)
363 {
364         /* FIXME */
365         return NULL;
366 }
367
368 static const struct tdb_methods transaction_methods = {
369         transaction_read,
370         transaction_write,
371         transaction_oob,
372         transaction_expand_file,
373         transaction_direct,
374 };
375
376 /*
377   sync to disk
378 */
379 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
380 {
381         if (tdb->flags & TDB_NOSYNC) {
382                 return 0;
383         }
384
385         if (fsync(tdb->fd) != 0) {
386                 tdb->ecode = TDB_ERR_IO;
387                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
388                          "tdb_transaction: fsync failed\n");
389                 return -1;
390         }
391 #ifdef MS_SYNC
392         if (tdb->map_ptr) {
393                 tdb_off_t moffset = offset & ~(getpagesize()-1);
394                 if (msync(moffset + (char *)tdb->map_ptr,
395                           length + (offset - moffset), MS_SYNC) != 0) {
396                         tdb->ecode = TDB_ERR_IO;
397                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
398                                  "tdb_transaction: msync failed - %s\n",
399                                  strerror(errno));
400                         return -1;
401                 }
402         }
403 #endif
404         return 0;
405 }
406
407
408 static void _tdb_transaction_cancel(struct tdb_context *tdb)
409 {
410         int i;
411
412         if (tdb->transaction == NULL) {
413                 tdb->ecode = TDB_ERR_EINVAL;
414                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
415                          "tdb_transaction_cancel: no transaction\n");
416                 return;
417         }
418
419         if (tdb->transaction->nesting != 0) {
420                 tdb->transaction->transaction_error = 1;
421                 tdb->transaction->nesting--;
422                 return;
423         }
424
425         tdb->map_size = tdb->transaction->old_map_size;
426
427         /* free all the transaction blocks */
428         for (i=0;i<tdb->transaction->num_blocks;i++) {
429                 if (tdb->transaction->blocks[i] != NULL) {
430                         free(tdb->transaction->blocks[i]);
431                 }
432         }
433         SAFE_FREE(tdb->transaction->blocks);
434
435         if (tdb->transaction->magic_offset) {
436                 const struct tdb_methods *methods = tdb->transaction->io_methods;
437                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
438
439                 /* remove the recovery marker */
440                 if (methods->write(tdb, tdb->transaction->magic_offset,
441                                    &invalid, sizeof(invalid)) == -1 ||
442                     transaction_sync(tdb, tdb->transaction->magic_offset,
443                                      sizeof(invalid)) == -1) {
444                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
445                                  "tdb_transaction_cancel: failed to remove"
446                                  " recovery magic\n");
447                 }
448         }
449
450         if (tdb->allrecord_lock.count)
451                 tdb_allrecord_unlock(tdb, tdb->allrecord_lock.ltype);
452
453         /* restore the normal io methods */
454         tdb->methods = tdb->transaction->io_methods;
455
456         tdb_transaction_unlock(tdb, F_WRLCK);
457
458         if (tdb_has_open_lock(tdb))
459                 tdb_unlock_open(tdb);
460
461         SAFE_FREE(tdb->transaction);
462 }
463
464 /*
465   start a tdb transaction. No token is returned, as only a single
466   transaction is allowed to be pending per tdb_context
467 */
468 int tdb_transaction_start(struct tdb_context *tdb)
469 {
470         /* some sanity checks */
471         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
472                 tdb->ecode = TDB_ERR_EINVAL;
473                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
474                          "tdb_transaction_start: cannot start a transaction"
475                          " on a read-only or internal db\n");
476                 return -1;
477         }
478
479         /* cope with nested tdb_transaction_start() calls */
480         if (tdb->transaction != NULL) {
481                 tdb->ecode = TDB_ERR_NESTING;
482                 return -1;
483         }
484
485         if (tdb_has_hash_locks(tdb)) {
486                 /* the caller must not have any locks when starting a
487                    transaction as otherwise we'll be screwed by lack
488                    of nested locks in posix */
489                 tdb->ecode = TDB_ERR_LOCK;
490                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
491                          "tdb_transaction_start: cannot start a transaction"
492                          " with locks held\n");
493                 return -1;
494         }
495
496         tdb->transaction = (struct tdb_transaction *)
497                 calloc(sizeof(struct tdb_transaction), 1);
498         if (tdb->transaction == NULL) {
499                 tdb->ecode = TDB_ERR_OOM;
500                 return -1;
501         }
502
503         /* get the transaction write lock. This is a blocking lock. As
504            discussed with Volker, there are a number of ways we could
505            make this async, which we will probably do in the future */
506         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
507                 SAFE_FREE(tdb->transaction->blocks);
508                 SAFE_FREE(tdb->transaction);
509                 return -1;
510         }
511
512         /* get a read lock over entire file. This is upgraded to a write
513            lock during the commit */
514         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
515                 goto fail_allrecord_lock;
516         }
517
518         /* make sure we know about any file expansions already done by
519            anyone else */
520         tdb->methods->oob(tdb, tdb->map_size + 1, true);
521         tdb->transaction->old_map_size = tdb->map_size;
522
523         /* finally hook the io methods, replacing them with
524            transaction specific methods */
525         tdb->transaction->io_methods = tdb->methods;
526         tdb->methods = &transaction_methods;
527         return 0;
528
529 fail_allrecord_lock:
530         tdb_transaction_unlock(tdb, F_WRLCK);
531         SAFE_FREE(tdb->transaction->blocks);
532         SAFE_FREE(tdb->transaction);
533         return -1;
534 }
535
536
537 /*
538   cancel the current transaction
539 */
540 void tdb_transaction_cancel(struct tdb_context *tdb)
541 {
542         _tdb_transaction_cancel(tdb);
543 }
544
545 /*
546   work out how much space the linearised recovery data will consume
547 */
548 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
549 {
550         tdb_len_t recovery_size = 0;
551         int i;
552
553         recovery_size = sizeof(tdb_len_t);
554         for (i=0;i<tdb->transaction->num_blocks;i++) {
555                 if (i * getpagesize() >= tdb->transaction->old_map_size) {
556                         break;
557                 }
558                 if (tdb->transaction->blocks[i] == NULL) {
559                         continue;
560                 }
561                 recovery_size += 2*sizeof(tdb_off_t);
562                 if (i == tdb->transaction->num_blocks-1) {
563                         recovery_size += tdb->transaction->last_block_size;
564                 } else {
565                         recovery_size += getpagesize();
566                 }
567         }
568
569         return recovery_size;
570 }
571
572 /*
573   allocate the recovery area, or use an existing recovery area if it is
574   large enough
575 */
576 static int tdb_recovery_allocate(struct tdb_context *tdb,
577                                  tdb_len_t *recovery_size,
578                                  tdb_off_t *recovery_offset,
579                                  tdb_len_t *recovery_max_size)
580 {
581         struct tdb_recovery_record rec;
582         const struct tdb_methods *methods = tdb->transaction->io_methods;
583         tdb_off_t recovery_head;
584         size_t addition;
585
586         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
587         if (recovery_head == TDB_OFF_ERR) {
588                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
589                          "tdb_recovery_allocate:"
590                          " failed to read recovery head\n");
591                 return -1;
592         }
593
594         if (recovery_head != 0) {
595                 if (methods->read(tdb, recovery_head, &rec, sizeof(rec))) {
596                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
597                                  "tdb_recovery_allocate:"
598                                  " failed to read recovery record\n");
599                         return -1;
600                 }
601                 tdb_convert(tdb, &rec, sizeof(rec));
602                 /* ignore invalid recovery regions: can happen in crash */
603                 if (rec.magic != TDB_RECOVERY_MAGIC &&
604                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
605                         recovery_head = 0;
606                 }
607         }
608
609         *recovery_size = tdb_recovery_size(tdb);
610
611         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
612                 /* it fits in the existing area */
613                 *recovery_max_size = rec.max_len;
614                 *recovery_offset = recovery_head;
615                 return 0;
616         }
617
618         /* we need to free up the old recovery area, then allocate a
619            new one at the end of the file. Note that we cannot use
620            normal allocation to allocate the new one as that might return
621            us an area that is being currently used (as of the start of
622            the transaction) */
623         if (recovery_head != 0) {
624                 if (add_free_record(tdb, recovery_head,
625                                     sizeof(rec) + rec.max_len) != 0) {
626                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
627                                  "tdb_recovery_allocate:"
628                                  " failed to free previous recovery area\n");
629                         return -1;
630                 }
631         }
632
633         /* the tdb_free() call might have increased the recovery size */
634         *recovery_size = tdb_recovery_size(tdb);
635
636         /* round up to a multiple of page size */
637         *recovery_max_size
638                 = (((sizeof(rec) + *recovery_size) + getpagesize()-1)
639                    & ~(getpagesize()-1))
640                 - sizeof(rec);
641         *recovery_offset = tdb->map_size;
642         recovery_head = *recovery_offset;
643
644         /* Restore ->map_size before calling underlying expand_file.
645            Also so that we don't try to expand the file again in the
646            transaction commit, which would destroy the recovery
647            area */
648         addition = (tdb->map_size - tdb->transaction->old_map_size) +
649                 sizeof(rec) + *recovery_max_size;
650         tdb->map_size = tdb->transaction->old_map_size;
651         if (methods->expand_file(tdb, addition) == -1) {
652                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
653                          "tdb_recovery_allocate:"
654                          " failed to create recovery area\n");
655                 return -1;
656         }
657
658         /* we have to reset the old map size so that we don't try to
659            expand the file again in the transaction commit, which
660            would destroy the recovery area */
661         tdb->transaction->old_map_size = tdb->map_size;
662
663         /* write the recovery header offset and sync - we can sync without a race here
664            as the magic ptr in the recovery record has not been set */
665         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
666         if (methods->write(tdb, offsetof(struct tdb_header, recovery),
667                            &recovery_head, sizeof(tdb_off_t)) == -1) {
668                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
669                          "tdb_recovery_allocate:"
670                          " failed to write recovery head\n");
671                 return -1;
672         }
673         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
674                                    &recovery_head,
675                                    sizeof(tdb_off_t));
676         return 0;
677 }
678
679 /* Set up header for the recovery record. */
680 static void set_recovery_header(struct tdb_recovery_record *rec,
681                                 uint64_t magic,
682                                 uint64_t datalen, uint64_t actuallen,
683                                 uint64_t oldsize)
684 {
685         rec->magic = magic;
686         rec->max_len = actuallen;
687         rec->len = datalen;
688         rec->eof = oldsize;
689 }
690
691 /*
692   setup the recovery data that will be used on a crash during commit
693 */
694 static int transaction_setup_recovery(struct tdb_context *tdb,
695                                       tdb_off_t *magic_offset)
696 {
697         tdb_len_t recovery_size;
698         unsigned char *data, *p;
699         const struct tdb_methods *methods = tdb->transaction->io_methods;
700         struct tdb_recovery_record *rec;
701         tdb_off_t recovery_offset, recovery_max_size;
702         tdb_off_t old_map_size = tdb->transaction->old_map_size;
703         uint64_t magic, tailer;
704         int i;
705
706         /*
707           check that the recovery area has enough space
708         */
709         if (tdb_recovery_allocate(tdb, &recovery_size,
710                                   &recovery_offset, &recovery_max_size) == -1) {
711                 return -1;
712         }
713
714         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
715         if (data == NULL) {
716                 tdb->ecode = TDB_ERR_OOM;
717                 return -1;
718         }
719
720         rec = (struct tdb_recovery_record *)data;
721         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
722                             recovery_size, recovery_max_size, old_map_size);
723         tdb_convert(tdb, rec, sizeof(*rec));
724
725         /* build the recovery data into a single blob to allow us to do a single
726            large write, which should be more efficient */
727         p = data + sizeof(*rec);
728         for (i=0;i<tdb->transaction->num_blocks;i++) {
729                 tdb_off_t offset;
730                 tdb_len_t length;
731
732                 if (tdb->transaction->blocks[i] == NULL) {
733                         continue;
734                 }
735
736                 offset = i * getpagesize();
737                 length = getpagesize();
738                 if (i == tdb->transaction->num_blocks-1) {
739                         length = tdb->transaction->last_block_size;
740                 }
741
742                 if (offset >= old_map_size) {
743                         continue;
744                 }
745                 if (offset + length > tdb->map_size) {
746                         tdb->ecode = TDB_ERR_CORRUPT;
747                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
748                                  "tdb_transaction_setup_recovery:"
749                                  " transaction data over new region boundary\n");
750                         free(data);
751                         return -1;
752                 }
753                 memcpy(p, &offset, sizeof(offset));
754                 memcpy(p + sizeof(offset), &length, sizeof(length));
755                 tdb_convert(tdb, p, sizeof(offset) + sizeof(length));
756
757                 /* the recovery area contains the old data, not the
758                    new data, so we have to call the original tdb_read
759                    method to get it */
760                 if (methods->read(tdb, offset,
761                                   p + sizeof(offset) + sizeof(length),
762                                   length) != 0) {
763                         free(data);
764                         return -1;
765                 }
766                 p += sizeof(offset) + sizeof(length) + length;
767         }
768
769         /* and the tailer */
770         tailer = sizeof(*rec) + recovery_max_size;
771         memcpy(p, &tailer, sizeof(tailer));
772         tdb_convert(tdb, p, sizeof(tailer));
773
774         /* write the recovery data to the recovery area */
775         if (methods->write(tdb, recovery_offset, data,
776                            sizeof(*rec) + recovery_size) == -1) {
777                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
778                          "tdb_transaction_setup_recovery:"
779                          " failed to write recovery data\n");
780                 free(data);
781                 return -1;
782         }
783         transaction_write_existing(tdb, recovery_offset, data,
784                                    sizeof(*rec) + recovery_size);
785
786         /* as we don't have ordered writes, we have to sync the recovery
787            data before we update the magic to indicate that the recovery
788            data is present */
789         if (transaction_sync(tdb, recovery_offset,
790                              sizeof(*rec) + recovery_size) == -1) {
791                 free(data);
792                 return -1;
793         }
794
795         free(data);
796
797         magic = TDB_RECOVERY_MAGIC;
798         tdb_convert(tdb, &magic, sizeof(magic));
799
800         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
801                                                    magic);
802
803         if (methods->write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
804                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
805                          "tdb_transaction_setup_recovery:"
806                          " failed to write recovery magic\n");
807                 return -1;
808         }
809         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
810
811         /* ensure the recovery magic marker is on disk */
812         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
813                 return -1;
814         }
815
816         return 0;
817 }
818
819 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
820 {
821         const struct tdb_methods *methods;
822
823         if (tdb->transaction == NULL) {
824                 tdb->ecode = TDB_ERR_EINVAL;
825                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
826                          "tdb_transaction_prepare_commit: no transaction\n");
827                 return -1;
828         }
829
830         if (tdb->transaction->prepared) {
831                 tdb->ecode = TDB_ERR_EINVAL;
832                 _tdb_transaction_cancel(tdb);
833                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
834                          "tdb_transaction_prepare_commit:"
835                          " transaction already prepared\n");
836                 return -1;
837         }
838
839         if (tdb->transaction->transaction_error) {
840                 tdb->ecode = TDB_ERR_IO;
841                 _tdb_transaction_cancel(tdb);
842                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
843                          "tdb_transaction_prepare_commit:"
844                          " transaction error pending\n");
845                 return -1;
846         }
847
848
849         if (tdb->transaction->nesting != 0) {
850                 tdb->transaction->nesting--;
851                 return 0;
852         }
853
854         /* check for a null transaction */
855         if (tdb->transaction->blocks == NULL) {
856                 return 0;
857         }
858
859         methods = tdb->transaction->io_methods;
860
861         /* upgrade the main transaction lock region to a write lock */
862         if (tdb_allrecord_upgrade(tdb) == -1) {
863                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
864                          "tdb_transaction_prepare_commit:"
865                          " failed to upgrade hash locks\n");
866                 _tdb_transaction_cancel(tdb);
867                 return -1;
868         }
869
870         /* get the open lock - this prevents new users attaching to the database
871            during the commit */
872         if (tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK) == -1) {
873                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
874                          "tdb_transaction_prepare_commit:"
875                          " failed to get open lock\n");
876                 _tdb_transaction_cancel(tdb);
877                 return -1;
878         }
879
880         /* Since we have whole db locked, we don't need the expansion lock. */
881         if (!(tdb->flags & TDB_NOSYNC)) {
882                 /* write the recovery data to the end of the file */
883                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
884                         tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
885                                  "tdb_transaction_prepare_commit:"
886                                  " failed to setup recovery data\n");
887                         _tdb_transaction_cancel(tdb);
888                         return -1;
889                 }
890         }
891
892         tdb->transaction->prepared = true;
893
894         /* expand the file to the new size if needed */
895         if (tdb->map_size != tdb->transaction->old_map_size) {
896                 tdb_len_t add = tdb->map_size - tdb->transaction->old_map_size;
897                 /* Restore original map size for tdb_expand_file */
898                 tdb->map_size = tdb->transaction->old_map_size;
899                 if (methods->expand_file(tdb, add) == -1) {
900                         tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
901                                  "tdb_transaction_prepare_commit:"
902                                  " expansion failed\n");
903                         _tdb_transaction_cancel(tdb);
904                         return -1;
905                 }
906         }
907
908         /* Keep the open lock until the actual commit */
909
910         return 0;
911 }
912
913 /*
914    prepare to commit the current transaction
915 */
916 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
917 {
918         return _tdb_transaction_prepare_commit(tdb);
919 }
920
921 /*
922   commit the current transaction
923 */
924 int tdb_transaction_commit(struct tdb_context *tdb)
925 {
926         const struct tdb_methods *methods;
927         int i;
928
929         if (tdb->transaction == NULL) {
930                 tdb->ecode = TDB_ERR_EINVAL;
931                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
932                          "tdb_transaction_commit: no transaction\n");
933                 return -1;
934         }
935
936         tdb_trace(tdb, "tdb_transaction_commit");
937
938         if (tdb->transaction->transaction_error) {
939                 tdb->ecode = TDB_ERR_IO;
940                 tdb_transaction_cancel(tdb);
941                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
942                          "tdb_transaction_commit: transaction error pending\n");
943                 return -1;
944         }
945
946
947         if (tdb->transaction->nesting != 0) {
948                 tdb->transaction->nesting--;
949                 return 0;
950         }
951
952         /* check for a null transaction */
953         if (tdb->transaction->blocks == NULL) {
954                 _tdb_transaction_cancel(tdb);
955                 return 0;
956         }
957
958         if (!tdb->transaction->prepared) {
959                 int ret = _tdb_transaction_prepare_commit(tdb);
960                 if (ret)
961                         return ret;
962         }
963
964         methods = tdb->transaction->io_methods;
965
966         /* perform all the writes */
967         for (i=0;i<tdb->transaction->num_blocks;i++) {
968                 tdb_off_t offset;
969                 tdb_len_t length;
970
971                 if (tdb->transaction->blocks[i] == NULL) {
972                         continue;
973                 }
974
975                 offset = i * getpagesize();
976                 length = getpagesize();
977                 if (i == tdb->transaction->num_blocks-1) {
978                         length = tdb->transaction->last_block_size;
979                 }
980
981                 if (methods->write(tdb, offset, tdb->transaction->blocks[i],
982                                    length) == -1) {
983                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
984                                  "tdb_transaction_commit:"
985                                  " write failed during commit\n");
986
987                         /* we've overwritten part of the data and
988                            possibly expanded the file, so we need to
989                            run the crash recovery code */
990                         tdb->methods = methods;
991                         tdb_transaction_recover(tdb);
992
993                         _tdb_transaction_cancel(tdb);
994
995                         return -1;
996                 }
997                 SAFE_FREE(tdb->transaction->blocks[i]);
998         }
999
1000         SAFE_FREE(tdb->transaction->blocks);
1001         tdb->transaction->num_blocks = 0;
1002
1003         /* ensure the new data is on disk */
1004         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1005                 return -1;
1006         }
1007
1008         /*
1009           TODO: maybe write to some dummy hdr field, or write to magic
1010           offset without mmap, before the last sync, instead of the
1011           utime() call
1012         */
1013
1014         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1015            don't change the mtime of the file, this means the file may
1016            not be backed up (as tdb rounding to block sizes means that
1017            file size changes are quite rare too). The following forces
1018            mtime changes when a transaction completes */
1019 #if HAVE_UTIME
1020         utime(tdb->name, NULL);
1021 #endif
1022
1023         /* use a transaction cancel to free memory and remove the
1024            transaction locks */
1025         _tdb_transaction_cancel(tdb);
1026
1027         return 0;
1028 }
1029
1030
1031 /*
1032   recover from an aborted transaction. Must be called with exclusive
1033   database write access already established (including the open
1034   lock to prevent new processes attaching)
1035 */
1036 int tdb_transaction_recover(struct tdb_context *tdb)
1037 {
1038         tdb_off_t recovery_head, recovery_eof;
1039         unsigned char *data, *p;
1040         struct tdb_recovery_record rec;
1041
1042         /* find the recovery area */
1043         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1044         if (recovery_head == TDB_OFF_ERR) {
1045                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1046                          "tdb_transaction_recover:"
1047                          " failed to read recovery head\n");
1048                 return -1;
1049         }
1050
1051         if (recovery_head == 0) {
1052                 /* we have never allocated a recovery record */
1053                 return 0;
1054         }
1055
1056         /* read the recovery record */
1057         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1058                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1059                          "tdb_transaction_recover:"
1060                          " failed to read recovery record\n");
1061                 return -1;
1062         }
1063
1064         if (rec.magic != TDB_RECOVERY_MAGIC) {
1065                 /* there is no valid recovery data */
1066                 return 0;
1067         }
1068
1069         if (tdb->read_only) {
1070                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1071                          "tdb_transaction_recover:"
1072                          " attempt to recover read only database\n");
1073                 tdb->ecode = TDB_ERR_CORRUPT;
1074                 return -1;
1075         }
1076
1077         recovery_eof = rec.eof;
1078
1079         data = (unsigned char *)malloc(rec.len);
1080         if (data == NULL) {
1081                 tdb->ecode = TDB_ERR_OOM;
1082                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1083                          "tdb_transaction_recover:"
1084                          " failed to allocate recovery data\n");
1085                 return -1;
1086         }
1087
1088         /* read the full recovery data */
1089         if (tdb->methods->read(tdb, recovery_head + sizeof(rec), data,
1090                                rec.len) == -1) {
1091                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1092                          "tdb_transaction_recover:"
1093                          " failed to read recovery data\n");
1094                 return -1;
1095         }
1096
1097         /* recover the file data */
1098         p = data;
1099         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1100                 tdb_off_t ofs;
1101                 tdb_len_t len;
1102                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1103                 memcpy(&ofs, p, sizeof(ofs));
1104                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1105                 p += sizeof(ofs) + sizeof(len);
1106
1107                 if (tdb->methods->write(tdb, ofs, p, len) == -1) {
1108                         free(data);
1109                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1110                                  "tdb_transaction_recover:"
1111                                  " failed to recover %zu bytes at offset %zu\n",
1112                                  (size_t)len, (size_t)ofs);
1113                         return -1;
1114                 }
1115                 p += len;
1116         }
1117
1118         free(data);
1119
1120         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1121                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1122                          "tdb_transaction_recover: failed to sync recovery\n");
1123                 return -1;
1124         }
1125
1126         /* if the recovery area is after the recovered eof then remove it */
1127         if (recovery_eof <= recovery_head) {
1128                 if (tdb_write_off(tdb, offsetof(struct tdb_header,recovery), 0)
1129                     == -1) {
1130                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1131                                  "tdb_transaction_recover:"
1132                                  " failed to remove recovery head\n");
1133                         return -1;
1134                 }
1135         }
1136
1137         /* remove the recovery magic */
1138         if (tdb_write_off(tdb,
1139                           recovery_head
1140                           + offsetof(struct tdb_recovery_record, magic),
1141                           TDB_RECOVERY_INVALID_MAGIC) == -1) {
1142                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1143                          "tdb_transaction_recover:"
1144                          " failed to remove recovery magic\n");
1145                 return -1;
1146         }
1147
1148         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1149                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1150                          "tdb_transaction_recover: failed to sync2 recovery\n");
1151                 return -1;
1152         }
1153
1154         tdb->log(tdb, TDB_DEBUG_TRACE, tdb->log_priv,
1155                  "tdb_transaction_recover: recovered %zu byte database\n",
1156                  (size_t)recovery_eof);
1157
1158         /* all done */
1159         return 0;
1160 }
1161
1162 /* Any I/O failures we say "needs recovery". */
1163 bool tdb_needs_recovery(struct tdb_context *tdb)
1164 {
1165         tdb_off_t recovery_head;
1166         struct tdb_recovery_record rec;
1167
1168         /* find the recovery area */
1169         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1170         if (recovery_head == TDB_OFF_ERR) {
1171                 return true;
1172         }
1173
1174         if (recovery_head == 0) {
1175                 /* we have never allocated a recovery record */
1176                 return false;
1177         }
1178
1179         /* read the recovery record */
1180         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1181                 return true;
1182         }
1183
1184         return (rec.magic == TDB_RECOVERY_MAGIC);
1185 }