]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
tdb2: Add stats attribute.
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in posix locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is cancelled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or cancelled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129                             tdb_len_t len)
130 {
131         size_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % getpagesize()) > getpagesize()) {
135                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
136                 if (transaction_read(tdb, off, buf, len2) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / getpagesize();
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->read(tdb, off, buf, len) != 0) {
155                         goto fail;
156                 }
157                 return 0;
158         }
159
160         /* it is in the block list. Now check for the last block */
161         if (blk == tdb->transaction->num_blocks-1) {
162                 if (len > tdb->transaction->last_block_size) {
163                         goto fail;
164                 }
165         }
166
167         /* now copy it out of this block */
168         memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len);
169         return 0;
170
171 fail:
172         tdb->ecode = TDB_ERR_IO;
173         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
174                  "transaction_read: failed at off=%llu len=%llu\n",
175                  (long long)off, (long long)len);
176         tdb->transaction->transaction_error = 1;
177         return -1;
178 }
179
180
181 /*
182   write while in a transaction
183 */
184 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
185                              const void *buf, tdb_len_t len)
186 {
187         size_t blk;
188
189         /* Only a commit is allowed on a prepared transaction */
190         if (tdb->transaction->prepared) {
191                 tdb->ecode = TDB_ERR_EINVAL;
192                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
193                          "transaction_write: transaction already prepared,"
194                          " write not allowed\n");
195                 tdb->transaction->transaction_error = 1;
196                 return -1;
197         }
198
199         /* break it up into block sized chunks */
200         while (len + (off % getpagesize()) > getpagesize()) {
201                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
202                 if (transaction_write(tdb, off, buf, len2) != 0) {
203                         return -1;
204                 }
205                 len -= len2;
206                 off += len2;
207                 if (buf != NULL) {
208                         buf = (const void *)(len2 + (const char *)buf);
209                 }
210         }
211
212         if (len == 0) {
213                 return 0;
214         }
215
216         blk = off / getpagesize();
217         off = off % getpagesize();
218
219         if (tdb->transaction->num_blocks <= blk) {
220                 uint8_t **new_blocks;
221                 /* expand the blocks array */
222                 if (tdb->transaction->blocks == NULL) {
223                         new_blocks = (uint8_t **)malloc(
224                                 (blk+1)*sizeof(uint8_t *));
225                 } else {
226                         new_blocks = (uint8_t **)realloc(
227                                 tdb->transaction->blocks,
228                                 (blk+1)*sizeof(uint8_t *));
229                 }
230                 if (new_blocks == NULL) {
231                         tdb->ecode = TDB_ERR_OOM;
232                         goto fail;
233                 }
234                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
235                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
236                 tdb->transaction->blocks = new_blocks;
237                 tdb->transaction->num_blocks = blk+1;
238                 tdb->transaction->last_block_size = 0;
239         }
240
241         /* allocate and fill a block? */
242         if (tdb->transaction->blocks[blk] == NULL) {
243                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1);
244                 if (tdb->transaction->blocks[blk] == NULL) {
245                         tdb->ecode = TDB_ERR_OOM;
246                         tdb->transaction->transaction_error = 1;
247                         return -1;
248                 }
249                 if (tdb->transaction->old_map_size > blk * getpagesize()) {
250                         tdb_len_t len2 = getpagesize();
251                         if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) {
252                                 len2 = tdb->transaction->old_map_size - (blk * getpagesize());
253                         }
254                         if (tdb->transaction->io_methods->read(tdb, blk * getpagesize(),
255                                                                tdb->transaction->blocks[blk],
256                                                                len2) != 0) {
257                                 SAFE_FREE(tdb->transaction->blocks[blk]);
258                                 goto fail;
259                         }
260                         if (blk == tdb->transaction->num_blocks-1) {
261                                 tdb->transaction->last_block_size = len2;
262                         }
263                 }
264         }
265
266         /* overwrite part of an existing block */
267         if (buf == NULL) {
268                 memset(tdb->transaction->blocks[blk] + off, 0, len);
269         } else {
270                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
271         }
272         if (blk == tdb->transaction->num_blocks-1) {
273                 if (len + off > tdb->transaction->last_block_size) {
274                         tdb->transaction->last_block_size = len + off;
275                 }
276         }
277
278         return 0;
279
280 fail:
281         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
282                  "transaction_write: failed at off=%llu len=%llu\n",
283                  (long long)((blk*getpagesize()) + off),
284                  (long long)len);
285         tdb->transaction->transaction_error = 1;
286         return -1;
287 }
288
289
290 /*
291   write while in a transaction - this varient never expands the transaction blocks, it only
292   updates existing blocks. This means it cannot change the recovery size
293 */
294 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
295                                        const void *buf, tdb_len_t len)
296 {
297         size_t blk;
298
299         /* break it up into block sized chunks */
300         while (len + (off % getpagesize()) > getpagesize()) {
301                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
302                 transaction_write_existing(tdb, off, buf, len2);
303                 len -= len2;
304                 off += len2;
305                 if (buf != NULL) {
306                         buf = (const void *)(len2 + (const char *)buf);
307                 }
308         }
309
310         if (len == 0) {
311                 return;
312         }
313
314         blk = off / getpagesize();
315         off = off % getpagesize();
316
317         if (tdb->transaction->num_blocks <= blk ||
318             tdb->transaction->blocks[blk] == NULL) {
319                 return;
320         }
321
322         if (blk == tdb->transaction->num_blocks-1 &&
323             off + len > tdb->transaction->last_block_size) {
324                 if (off >= tdb->transaction->last_block_size) {
325                         return;
326                 }
327                 len = tdb->transaction->last_block_size - off;
328         }
329
330         /* overwrite part of an existing block */
331         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
332 }
333
334
335 /*
336   out of bounds check during a transaction
337 */
338 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
339 {
340         if (len <= tdb->map_size) {
341                 return 0;
342         }
343         tdb->ecode = TDB_ERR_IO;
344         return -1;
345 }
346
347 /*
348   transaction version of tdb_expand().
349 */
350 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t addition)
351 {
352         /* add a write to the transaction elements, so subsequent
353            reads see the zero data */
354         if (transaction_write(tdb, tdb->map_size, NULL, addition) != 0) {
355                 return -1;
356         }
357         tdb->map_size += addition;
358         return 0;
359 }
360
361 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
362                                 size_t len)
363 {
364         /* FIXME */
365         return NULL;
366 }
367
368 static const struct tdb_methods transaction_methods = {
369         transaction_read,
370         transaction_write,
371         transaction_oob,
372         transaction_expand_file,
373         transaction_direct,
374 };
375
376 /*
377   sync to disk
378 */
379 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
380 {
381         if (tdb->flags & TDB_NOSYNC) {
382                 return 0;
383         }
384
385         if (fsync(tdb->fd) != 0) {
386                 tdb->ecode = TDB_ERR_IO;
387                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
388                          "tdb_transaction: fsync failed\n");
389                 return -1;
390         }
391 #ifdef MS_SYNC
392         if (tdb->map_ptr) {
393                 tdb_off_t moffset = offset & ~(getpagesize()-1);
394                 if (msync(moffset + (char *)tdb->map_ptr,
395                           length + (offset - moffset), MS_SYNC) != 0) {
396                         tdb->ecode = TDB_ERR_IO;
397                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
398                                  "tdb_transaction: msync failed - %s\n",
399                                  strerror(errno));
400                         return -1;
401                 }
402         }
403 #endif
404         return 0;
405 }
406
407
408 static void _tdb_transaction_cancel(struct tdb_context *tdb)
409 {
410         int i;
411
412         if (tdb->transaction == NULL) {
413                 tdb->ecode = TDB_ERR_EINVAL;
414                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
415                          "tdb_transaction_cancel: no transaction\n");
416                 return;
417         }
418
419         if (tdb->transaction->nesting != 0) {
420                 tdb->transaction->transaction_error = 1;
421                 tdb->transaction->nesting--;
422                 return;
423         }
424
425         tdb->map_size = tdb->transaction->old_map_size;
426
427         /* free all the transaction blocks */
428         for (i=0;i<tdb->transaction->num_blocks;i++) {
429                 if (tdb->transaction->blocks[i] != NULL) {
430                         free(tdb->transaction->blocks[i]);
431                 }
432         }
433         SAFE_FREE(tdb->transaction->blocks);
434
435         if (tdb->transaction->magic_offset) {
436                 const struct tdb_methods *methods = tdb->transaction->io_methods;
437                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
438
439                 /* remove the recovery marker */
440                 if (methods->write(tdb, tdb->transaction->magic_offset,
441                                    &invalid, sizeof(invalid)) == -1 ||
442                     transaction_sync(tdb, tdb->transaction->magic_offset,
443                                      sizeof(invalid)) == -1) {
444                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
445                                  "tdb_transaction_cancel: failed to remove"
446                                  " recovery magic\n");
447                 }
448         }
449
450         if (tdb->allrecord_lock.count)
451                 tdb_allrecord_unlock(tdb, tdb->allrecord_lock.ltype);
452
453         /* restore the normal io methods */
454         tdb->methods = tdb->transaction->io_methods;
455
456         tdb_transaction_unlock(tdb, F_WRLCK);
457
458         if (tdb_has_open_lock(tdb))
459                 tdb_unlock_open(tdb);
460
461         SAFE_FREE(tdb->transaction);
462 }
463
464 /*
465   start a tdb transaction. No token is returned, as only a single
466   transaction is allowed to be pending per tdb_context
467 */
468 int tdb_transaction_start(struct tdb_context *tdb)
469 {
470         /* some sanity checks */
471         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
472                 tdb->ecode = TDB_ERR_EINVAL;
473                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
474                          "tdb_transaction_start: cannot start a transaction"
475                          " on a read-only or internal db\n");
476                 return -1;
477         }
478
479         /* cope with nested tdb_transaction_start() calls */
480         if (tdb->transaction != NULL) {
481                 tdb->ecode = TDB_ERR_NESTING;
482                 return -1;
483         }
484
485         if (tdb_has_hash_locks(tdb)) {
486                 /* the caller must not have any locks when starting a
487                    transaction as otherwise we'll be screwed by lack
488                    of nested locks in posix */
489                 tdb->ecode = TDB_ERR_LOCK;
490                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
491                          "tdb_transaction_start: cannot start a transaction"
492                          " with locks held\n");
493                 return -1;
494         }
495
496         tdb->transaction = (struct tdb_transaction *)
497                 calloc(sizeof(struct tdb_transaction), 1);
498         if (tdb->transaction == NULL) {
499                 tdb->ecode = TDB_ERR_OOM;
500                 return -1;
501         }
502
503         /* get the transaction write lock. This is a blocking lock. As
504            discussed with Volker, there are a number of ways we could
505            make this async, which we will probably do in the future */
506         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
507                 SAFE_FREE(tdb->transaction->blocks);
508                 SAFE_FREE(tdb->transaction);
509                 return -1;
510         }
511
512         /* get a read lock over entire file. This is upgraded to a write
513            lock during the commit */
514         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
515                 goto fail_allrecord_lock;
516         }
517
518         /* make sure we know about any file expansions already done by
519            anyone else */
520         tdb->methods->oob(tdb, tdb->map_size + 1, true);
521         tdb->transaction->old_map_size = tdb->map_size;
522
523         /* finally hook the io methods, replacing them with
524            transaction specific methods */
525         tdb->transaction->io_methods = tdb->methods;
526         tdb->methods = &transaction_methods;
527         return 0;
528
529 fail_allrecord_lock:
530         tdb_transaction_unlock(tdb, F_WRLCK);
531         SAFE_FREE(tdb->transaction->blocks);
532         SAFE_FREE(tdb->transaction);
533         return -1;
534 }
535
536
537 /*
538   cancel the current transaction
539 */
540 void tdb_transaction_cancel(struct tdb_context *tdb)
541 {
542         _tdb_transaction_cancel(tdb);
543 }
544
545 /*
546   work out how much space the linearised recovery data will consume
547 */
548 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
549 {
550         tdb_len_t recovery_size = 0;
551         int i;
552
553         recovery_size = sizeof(tdb_len_t);
554         for (i=0;i<tdb->transaction->num_blocks;i++) {
555                 if (i * getpagesize() >= tdb->transaction->old_map_size) {
556                         break;
557                 }
558                 if (tdb->transaction->blocks[i] == NULL) {
559                         continue;
560                 }
561                 recovery_size += 2*sizeof(tdb_off_t);
562                 if (i == tdb->transaction->num_blocks-1) {
563                         recovery_size += tdb->transaction->last_block_size;
564                 } else {
565                         recovery_size += getpagesize();
566                 }
567         }
568
569         return recovery_size;
570 }
571
572 /*
573   allocate the recovery area, or use an existing recovery area if it is
574   large enough
575 */
576 static int tdb_recovery_allocate(struct tdb_context *tdb,
577                                  tdb_len_t *recovery_size,
578                                  tdb_off_t *recovery_offset,
579                                  tdb_len_t *recovery_max_size)
580 {
581         struct tdb_recovery_record rec;
582         const struct tdb_methods *methods = tdb->transaction->io_methods;
583         tdb_off_t recovery_head;
584         size_t addition;
585
586         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
587         if (recovery_head == TDB_OFF_ERR) {
588                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
589                          "tdb_recovery_allocate:"
590                          " failed to read recovery head\n");
591                 return -1;
592         }
593
594         if (recovery_head != 0) {
595                 if (methods->read(tdb, recovery_head, &rec, sizeof(rec))) {
596                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
597                                  "tdb_recovery_allocate:"
598                                  " failed to read recovery record\n");
599                         return -1;
600                 }
601                 tdb_convert(tdb, &rec, sizeof(rec));
602                 /* ignore invalid recovery regions: can happen in crash */
603                 if (rec.magic != TDB_RECOVERY_MAGIC &&
604                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
605                         recovery_head = 0;
606                 }
607         }
608
609         *recovery_size = tdb_recovery_size(tdb);
610
611         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
612                 /* it fits in the existing area */
613                 *recovery_max_size = rec.max_len;
614                 *recovery_offset = recovery_head;
615                 return 0;
616         }
617
618         /* we need to free up the old recovery area, then allocate a
619            new one at the end of the file. Note that we cannot use
620            normal allocation to allocate the new one as that might return
621            us an area that is being currently used (as of the start of
622            the transaction) */
623         if (recovery_head != 0) {
624                 add_stat(tdb, frees, 1);
625                 if (add_free_record(tdb, recovery_head,
626                                     sizeof(rec) + rec.max_len) != 0) {
627                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
628                                  "tdb_recovery_allocate:"
629                                  " failed to free previous recovery area\n");
630                         return -1;
631                 }
632         }
633
634         /* the tdb_free() call might have increased the recovery size */
635         *recovery_size = tdb_recovery_size(tdb);
636
637         /* round up to a multiple of page size */
638         *recovery_max_size
639                 = (((sizeof(rec) + *recovery_size) + getpagesize()-1)
640                    & ~(getpagesize()-1))
641                 - sizeof(rec);
642         *recovery_offset = tdb->map_size;
643         recovery_head = *recovery_offset;
644
645         /* Restore ->map_size before calling underlying expand_file.
646            Also so that we don't try to expand the file again in the
647            transaction commit, which would destroy the recovery
648            area */
649         addition = (tdb->map_size - tdb->transaction->old_map_size) +
650                 sizeof(rec) + *recovery_max_size;
651         tdb->map_size = tdb->transaction->old_map_size;
652         if (methods->expand_file(tdb, addition) == -1) {
653                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
654                          "tdb_recovery_allocate:"
655                          " failed to create recovery area\n");
656                 return -1;
657         }
658
659         /* we have to reset the old map size so that we don't try to
660            expand the file again in the transaction commit, which
661            would destroy the recovery area */
662         tdb->transaction->old_map_size = tdb->map_size;
663
664         /* write the recovery header offset and sync - we can sync without a race here
665            as the magic ptr in the recovery record has not been set */
666         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
667         if (methods->write(tdb, offsetof(struct tdb_header, recovery),
668                            &recovery_head, sizeof(tdb_off_t)) == -1) {
669                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
670                          "tdb_recovery_allocate:"
671                          " failed to write recovery head\n");
672                 return -1;
673         }
674         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
675                                    &recovery_head,
676                                    sizeof(tdb_off_t));
677         return 0;
678 }
679
680 /* Set up header for the recovery record. */
681 static void set_recovery_header(struct tdb_recovery_record *rec,
682                                 uint64_t magic,
683                                 uint64_t datalen, uint64_t actuallen,
684                                 uint64_t oldsize)
685 {
686         rec->magic = magic;
687         rec->max_len = actuallen;
688         rec->len = datalen;
689         rec->eof = oldsize;
690 }
691
692 /*
693   setup the recovery data that will be used on a crash during commit
694 */
695 static int transaction_setup_recovery(struct tdb_context *tdb,
696                                       tdb_off_t *magic_offset)
697 {
698         tdb_len_t recovery_size;
699         unsigned char *data, *p;
700         const struct tdb_methods *methods = tdb->transaction->io_methods;
701         struct tdb_recovery_record *rec;
702         tdb_off_t recovery_offset, recovery_max_size;
703         tdb_off_t old_map_size = tdb->transaction->old_map_size;
704         uint64_t magic, tailer;
705         int i;
706
707         /*
708           check that the recovery area has enough space
709         */
710         if (tdb_recovery_allocate(tdb, &recovery_size,
711                                   &recovery_offset, &recovery_max_size) == -1) {
712                 return -1;
713         }
714
715         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
716         if (data == NULL) {
717                 tdb->ecode = TDB_ERR_OOM;
718                 return -1;
719         }
720
721         rec = (struct tdb_recovery_record *)data;
722         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
723                             recovery_size, recovery_max_size, old_map_size);
724         tdb_convert(tdb, rec, sizeof(*rec));
725
726         /* build the recovery data into a single blob to allow us to do a single
727            large write, which should be more efficient */
728         p = data + sizeof(*rec);
729         for (i=0;i<tdb->transaction->num_blocks;i++) {
730                 tdb_off_t offset;
731                 tdb_len_t length;
732
733                 if (tdb->transaction->blocks[i] == NULL) {
734                         continue;
735                 }
736
737                 offset = i * getpagesize();
738                 length = getpagesize();
739                 if (i == tdb->transaction->num_blocks-1) {
740                         length = tdb->transaction->last_block_size;
741                 }
742
743                 if (offset >= old_map_size) {
744                         continue;
745                 }
746                 if (offset + length > tdb->map_size) {
747                         tdb->ecode = TDB_ERR_CORRUPT;
748                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
749                                  "tdb_transaction_setup_recovery:"
750                                  " transaction data over new region boundary\n");
751                         free(data);
752                         return -1;
753                 }
754                 memcpy(p, &offset, sizeof(offset));
755                 memcpy(p + sizeof(offset), &length, sizeof(length));
756                 tdb_convert(tdb, p, sizeof(offset) + sizeof(length));
757
758                 /* the recovery area contains the old data, not the
759                    new data, so we have to call the original tdb_read
760                    method to get it */
761                 if (methods->read(tdb, offset,
762                                   p + sizeof(offset) + sizeof(length),
763                                   length) != 0) {
764                         free(data);
765                         return -1;
766                 }
767                 p += sizeof(offset) + sizeof(length) + length;
768         }
769
770         /* and the tailer */
771         tailer = sizeof(*rec) + recovery_max_size;
772         memcpy(p, &tailer, sizeof(tailer));
773         tdb_convert(tdb, p, sizeof(tailer));
774
775         /* write the recovery data to the recovery area */
776         if (methods->write(tdb, recovery_offset, data,
777                            sizeof(*rec) + recovery_size) == -1) {
778                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
779                          "tdb_transaction_setup_recovery:"
780                          " failed to write recovery data\n");
781                 free(data);
782                 return -1;
783         }
784         transaction_write_existing(tdb, recovery_offset, data,
785                                    sizeof(*rec) + recovery_size);
786
787         /* as we don't have ordered writes, we have to sync the recovery
788            data before we update the magic to indicate that the recovery
789            data is present */
790         if (transaction_sync(tdb, recovery_offset,
791                              sizeof(*rec) + recovery_size) == -1) {
792                 free(data);
793                 return -1;
794         }
795
796         free(data);
797
798         magic = TDB_RECOVERY_MAGIC;
799         tdb_convert(tdb, &magic, sizeof(magic));
800
801         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
802                                                    magic);
803
804         if (methods->write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
805                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
806                          "tdb_transaction_setup_recovery:"
807                          " failed to write recovery magic\n");
808                 return -1;
809         }
810         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
811
812         /* ensure the recovery magic marker is on disk */
813         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
814                 return -1;
815         }
816
817         return 0;
818 }
819
820 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
821 {
822         const struct tdb_methods *methods;
823
824         if (tdb->transaction == NULL) {
825                 tdb->ecode = TDB_ERR_EINVAL;
826                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
827                          "tdb_transaction_prepare_commit: no transaction\n");
828                 return -1;
829         }
830
831         if (tdb->transaction->prepared) {
832                 tdb->ecode = TDB_ERR_EINVAL;
833                 _tdb_transaction_cancel(tdb);
834                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
835                          "tdb_transaction_prepare_commit:"
836                          " transaction already prepared\n");
837                 return -1;
838         }
839
840         if (tdb->transaction->transaction_error) {
841                 tdb->ecode = TDB_ERR_IO;
842                 _tdb_transaction_cancel(tdb);
843                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
844                          "tdb_transaction_prepare_commit:"
845                          " transaction error pending\n");
846                 return -1;
847         }
848
849
850         if (tdb->transaction->nesting != 0) {
851                 tdb->transaction->nesting--;
852                 return 0;
853         }
854
855         /* check for a null transaction */
856         if (tdb->transaction->blocks == NULL) {
857                 return 0;
858         }
859
860         methods = tdb->transaction->io_methods;
861
862         /* upgrade the main transaction lock region to a write lock */
863         if (tdb_allrecord_upgrade(tdb) == -1) {
864                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
865                          "tdb_transaction_prepare_commit:"
866                          " failed to upgrade hash locks\n");
867                 _tdb_transaction_cancel(tdb);
868                 return -1;
869         }
870
871         /* get the open lock - this prevents new users attaching to the database
872            during the commit */
873         if (tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK) == -1) {
874                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
875                          "tdb_transaction_prepare_commit:"
876                          " failed to get open lock\n");
877                 _tdb_transaction_cancel(tdb);
878                 return -1;
879         }
880
881         /* Since we have whole db locked, we don't need the expansion lock. */
882         if (!(tdb->flags & TDB_NOSYNC)) {
883                 /* write the recovery data to the end of the file */
884                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
885                         tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
886                                  "tdb_transaction_prepare_commit:"
887                                  " failed to setup recovery data\n");
888                         _tdb_transaction_cancel(tdb);
889                         return -1;
890                 }
891         }
892
893         tdb->transaction->prepared = true;
894
895         /* expand the file to the new size if needed */
896         if (tdb->map_size != tdb->transaction->old_map_size) {
897                 tdb_len_t add = tdb->map_size - tdb->transaction->old_map_size;
898                 /* Restore original map size for tdb_expand_file */
899                 tdb->map_size = tdb->transaction->old_map_size;
900                 if (methods->expand_file(tdb, add) == -1) {
901                         tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
902                                  "tdb_transaction_prepare_commit:"
903                                  " expansion failed\n");
904                         _tdb_transaction_cancel(tdb);
905                         return -1;
906                 }
907         }
908
909         /* Keep the open lock until the actual commit */
910
911         return 0;
912 }
913
914 /*
915    prepare to commit the current transaction
916 */
917 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
918 {
919         return _tdb_transaction_prepare_commit(tdb);
920 }
921
922 /*
923   commit the current transaction
924 */
925 int tdb_transaction_commit(struct tdb_context *tdb)
926 {
927         const struct tdb_methods *methods;
928         int i;
929
930         if (tdb->transaction == NULL) {
931                 tdb->ecode = TDB_ERR_EINVAL;
932                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
933                          "tdb_transaction_commit: no transaction\n");
934                 return -1;
935         }
936
937         tdb_trace(tdb, "tdb_transaction_commit");
938
939         if (tdb->transaction->transaction_error) {
940                 tdb->ecode = TDB_ERR_IO;
941                 tdb_transaction_cancel(tdb);
942                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
943                          "tdb_transaction_commit: transaction error pending\n");
944                 return -1;
945         }
946
947
948         if (tdb->transaction->nesting != 0) {
949                 tdb->transaction->nesting--;
950                 return 0;
951         }
952
953         /* check for a null transaction */
954         if (tdb->transaction->blocks == NULL) {
955                 _tdb_transaction_cancel(tdb);
956                 return 0;
957         }
958
959         if (!tdb->transaction->prepared) {
960                 int ret = _tdb_transaction_prepare_commit(tdb);
961                 if (ret)
962                         return ret;
963         }
964
965         methods = tdb->transaction->io_methods;
966
967         /* perform all the writes */
968         for (i=0;i<tdb->transaction->num_blocks;i++) {
969                 tdb_off_t offset;
970                 tdb_len_t length;
971
972                 if (tdb->transaction->blocks[i] == NULL) {
973                         continue;
974                 }
975
976                 offset = i * getpagesize();
977                 length = getpagesize();
978                 if (i == tdb->transaction->num_blocks-1) {
979                         length = tdb->transaction->last_block_size;
980                 }
981
982                 if (methods->write(tdb, offset, tdb->transaction->blocks[i],
983                                    length) == -1) {
984                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
985                                  "tdb_transaction_commit:"
986                                  " write failed during commit\n");
987
988                         /* we've overwritten part of the data and
989                            possibly expanded the file, so we need to
990                            run the crash recovery code */
991                         tdb->methods = methods;
992                         tdb_transaction_recover(tdb);
993
994                         _tdb_transaction_cancel(tdb);
995
996                         return -1;
997                 }
998                 SAFE_FREE(tdb->transaction->blocks[i]);
999         }
1000
1001         SAFE_FREE(tdb->transaction->blocks);
1002         tdb->transaction->num_blocks = 0;
1003
1004         /* ensure the new data is on disk */
1005         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1006                 return -1;
1007         }
1008
1009         /*
1010           TODO: maybe write to some dummy hdr field, or write to magic
1011           offset without mmap, before the last sync, instead of the
1012           utime() call
1013         */
1014
1015         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1016            don't change the mtime of the file, this means the file may
1017            not be backed up (as tdb rounding to block sizes means that
1018            file size changes are quite rare too). The following forces
1019            mtime changes when a transaction completes */
1020 #if HAVE_UTIME
1021         utime(tdb->name, NULL);
1022 #endif
1023
1024         /* use a transaction cancel to free memory and remove the
1025            transaction locks */
1026         _tdb_transaction_cancel(tdb);
1027
1028         return 0;
1029 }
1030
1031
1032 /*
1033   recover from an aborted transaction. Must be called with exclusive
1034   database write access already established (including the open
1035   lock to prevent new processes attaching)
1036 */
1037 int tdb_transaction_recover(struct tdb_context *tdb)
1038 {
1039         tdb_off_t recovery_head, recovery_eof;
1040         unsigned char *data, *p;
1041         struct tdb_recovery_record rec;
1042
1043         /* find the recovery area */
1044         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1045         if (recovery_head == TDB_OFF_ERR) {
1046                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1047                          "tdb_transaction_recover:"
1048                          " failed to read recovery head\n");
1049                 return -1;
1050         }
1051
1052         if (recovery_head == 0) {
1053                 /* we have never allocated a recovery record */
1054                 return 0;
1055         }
1056
1057         /* read the recovery record */
1058         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1059                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1060                          "tdb_transaction_recover:"
1061                          " failed to read recovery record\n");
1062                 return -1;
1063         }
1064
1065         if (rec.magic != TDB_RECOVERY_MAGIC) {
1066                 /* there is no valid recovery data */
1067                 return 0;
1068         }
1069
1070         if (tdb->read_only) {
1071                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1072                          "tdb_transaction_recover:"
1073                          " attempt to recover read only database\n");
1074                 tdb->ecode = TDB_ERR_CORRUPT;
1075                 return -1;
1076         }
1077
1078         recovery_eof = rec.eof;
1079
1080         data = (unsigned char *)malloc(rec.len);
1081         if (data == NULL) {
1082                 tdb->ecode = TDB_ERR_OOM;
1083                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1084                          "tdb_transaction_recover:"
1085                          " failed to allocate recovery data\n");
1086                 return -1;
1087         }
1088
1089         /* read the full recovery data */
1090         if (tdb->methods->read(tdb, recovery_head + sizeof(rec), data,
1091                                rec.len) == -1) {
1092                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1093                          "tdb_transaction_recover:"
1094                          " failed to read recovery data\n");
1095                 return -1;
1096         }
1097
1098         /* recover the file data */
1099         p = data;
1100         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1101                 tdb_off_t ofs;
1102                 tdb_len_t len;
1103                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1104                 memcpy(&ofs, p, sizeof(ofs));
1105                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1106                 p += sizeof(ofs) + sizeof(len);
1107
1108                 if (tdb->methods->write(tdb, ofs, p, len) == -1) {
1109                         free(data);
1110                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1111                                  "tdb_transaction_recover:"
1112                                  " failed to recover %zu bytes at offset %zu\n",
1113                                  (size_t)len, (size_t)ofs);
1114                         return -1;
1115                 }
1116                 p += len;
1117         }
1118
1119         free(data);
1120
1121         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1122                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1123                          "tdb_transaction_recover: failed to sync recovery\n");
1124                 return -1;
1125         }
1126
1127         /* if the recovery area is after the recovered eof then remove it */
1128         if (recovery_eof <= recovery_head) {
1129                 if (tdb_write_off(tdb, offsetof(struct tdb_header,recovery), 0)
1130                     == -1) {
1131                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1132                                  "tdb_transaction_recover:"
1133                                  " failed to remove recovery head\n");
1134                         return -1;
1135                 }
1136         }
1137
1138         /* remove the recovery magic */
1139         if (tdb_write_off(tdb,
1140                           recovery_head
1141                           + offsetof(struct tdb_recovery_record, magic),
1142                           TDB_RECOVERY_INVALID_MAGIC) == -1) {
1143                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1144                          "tdb_transaction_recover:"
1145                          " failed to remove recovery magic\n");
1146                 return -1;
1147         }
1148
1149         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1150                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1151                          "tdb_transaction_recover: failed to sync2 recovery\n");
1152                 return -1;
1153         }
1154
1155         tdb->log(tdb, TDB_DEBUG_TRACE, tdb->log_priv,
1156                  "tdb_transaction_recover: recovered %zu byte database\n",
1157                  (size_t)recovery_eof);
1158
1159         /* all done */
1160         return 0;
1161 }
1162
1163 /* Any I/O failures we say "needs recovery". */
1164 bool tdb_needs_recovery(struct tdb_context *tdb)
1165 {
1166         tdb_off_t recovery_head;
1167         struct tdb_recovery_record rec;
1168
1169         /* find the recovery area */
1170         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1171         if (recovery_head == TDB_OFF_ERR) {
1172                 return true;
1173         }
1174
1175         if (recovery_head == 0) {
1176                 /* we have never allocated a recovery record */
1177                 return false;
1178         }
1179
1180         /* read the recovery record */
1181         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1182                 return true;
1183         }
1184
1185         return (rec.magic == TDB_RECOVERY_MAGIC);
1186 }