]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
cb516f181fa38311b3f3bc55d25716aa2f8ea91b
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in posix locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is cancelled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or cancelled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129                             tdb_len_t len)
130 {
131         size_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % getpagesize()) > getpagesize()) {
135                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
136                 if (transaction_read(tdb, off, buf, len2) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / getpagesize();
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->tread(tdb, off, buf, len)
155                     != 0) {
156                         goto fail;
157                 }
158                 return 0;
159         }
160
161         /* it is in the block list. Now check for the last block */
162         if (blk == tdb->transaction->num_blocks-1) {
163                 if (len > tdb->transaction->last_block_size) {
164                         goto fail;
165                 }
166         }
167
168         /* now copy it out of this block */
169         memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len);
170         return 0;
171
172 fail:
173         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
174                    "transaction_read: failed at off=%zu len=%zu",
175                    (size_t)off, (size_t)len);
176         tdb->transaction->transaction_error = 1;
177         return -1;
178 }
179
180
181 /*
182   write while in a transaction
183 */
184 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
185                              const void *buf, tdb_len_t len)
186 {
187         size_t blk;
188
189         /* Only a commit is allowed on a prepared transaction */
190         if (tdb->transaction->prepared) {
191                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
192                          "transaction_write: transaction already prepared,"
193                          " write not allowed");
194                 goto fail;
195         }
196
197         /* break it up into block sized chunks */
198         while (len + (off % getpagesize()) > getpagesize()) {
199                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
200                 if (transaction_write(tdb, off, buf, len2) != 0) {
201                         return -1;
202                 }
203                 len -= len2;
204                 off += len2;
205                 if (buf != NULL) {
206                         buf = (const void *)(len2 + (const char *)buf);
207                 }
208         }
209
210         if (len == 0) {
211                 return 0;
212         }
213
214         blk = off / getpagesize();
215         off = off % getpagesize();
216
217         if (tdb->transaction->num_blocks <= blk) {
218                 uint8_t **new_blocks;
219                 /* expand the blocks array */
220                 if (tdb->transaction->blocks == NULL) {
221                         new_blocks = (uint8_t **)malloc(
222                                 (blk+1)*sizeof(uint8_t *));
223                 } else {
224                         new_blocks = (uint8_t **)realloc(
225                                 tdb->transaction->blocks,
226                                 (blk+1)*sizeof(uint8_t *));
227                 }
228                 if (new_blocks == NULL) {
229                         tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
230                                    "transaction_write: failed to allocate");
231                         goto fail;
232                 }
233                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
234                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
235                 tdb->transaction->blocks = new_blocks;
236                 tdb->transaction->num_blocks = blk+1;
237                 tdb->transaction->last_block_size = 0;
238         }
239
240         /* allocate and fill a block? */
241         if (tdb->transaction->blocks[blk] == NULL) {
242                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1);
243                 if (tdb->transaction->blocks[blk] == NULL) {
244                         tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
245                                    "transaction_write: failed to allocate");
246                         goto fail;
247                 }
248                 if (tdb->transaction->old_map_size > blk * getpagesize()) {
249                         tdb_len_t len2 = getpagesize();
250                         if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) {
251                                 len2 = tdb->transaction->old_map_size - (blk * getpagesize());
252                         }
253                         if (tdb->transaction->io_methods->tread(tdb, blk * getpagesize(),
254                                                                 tdb->transaction->blocks[blk],
255                                                                 len2) != 0) {
256                                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
257                                            "transaction_write: failed to"
258                                            " read old block: %s",
259                                            strerror(errno));
260                                 SAFE_FREE(tdb->transaction->blocks[blk]);
261                                 goto fail;
262                         }
263                         if (blk == tdb->transaction->num_blocks-1) {
264                                 tdb->transaction->last_block_size = len2;
265                         }
266                 }
267         }
268
269         /* overwrite part of an existing block */
270         if (buf == NULL) {
271                 memset(tdb->transaction->blocks[blk] + off, 0, len);
272         } else {
273                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
274         }
275         if (blk == tdb->transaction->num_blocks-1) {
276                 if (len + off > tdb->transaction->last_block_size) {
277                         tdb->transaction->last_block_size = len + off;
278                 }
279         }
280
281         return 0;
282
283 fail:
284         tdb->transaction->transaction_error = 1;
285         return -1;
286 }
287
288
289 /*
290   write while in a transaction - this varient never expands the transaction blocks, it only
291   updates existing blocks. This means it cannot change the recovery size
292 */
293 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
294                                        const void *buf, tdb_len_t len)
295 {
296         size_t blk;
297
298         /* break it up into block sized chunks */
299         while (len + (off % getpagesize()) > getpagesize()) {
300                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
301                 transaction_write_existing(tdb, off, buf, len2);
302                 len -= len2;
303                 off += len2;
304                 if (buf != NULL) {
305                         buf = (const void *)(len2 + (const char *)buf);
306                 }
307         }
308
309         if (len == 0) {
310                 return;
311         }
312
313         blk = off / getpagesize();
314         off = off % getpagesize();
315
316         if (tdb->transaction->num_blocks <= blk ||
317             tdb->transaction->blocks[blk] == NULL) {
318                 return;
319         }
320
321         if (blk == tdb->transaction->num_blocks-1 &&
322             off + len > tdb->transaction->last_block_size) {
323                 if (off >= tdb->transaction->last_block_size) {
324                         return;
325                 }
326                 len = tdb->transaction->last_block_size - off;
327         }
328
329         /* overwrite part of an existing block */
330         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
331 }
332
333
334 /*
335   out of bounds check during a transaction
336 */
337 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
338 {
339         if (len <= tdb->map_size) {
340                 return 0;
341         }
342         tdb->ecode = TDB_ERR_IO;
343         if (!probe) {
344                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
345                            "tdb_oob len %lld beyond transaction size %lld",
346                            (long long)len,
347                            (long long)tdb->map_size);
348         }
349         return -1;
350 }
351
352 /*
353   transaction version of tdb_expand().
354 */
355 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t addition)
356 {
357         /* add a write to the transaction elements, so subsequent
358            reads see the zero data */
359         if (transaction_write(tdb, tdb->map_size, NULL, addition) != 0) {
360                 return -1;
361         }
362         tdb->map_size += addition;
363         return 0;
364 }
365
366 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
367                                 size_t len, bool write_mode)
368 {
369         size_t blk = off / getpagesize(), end_blk;
370
371         /* This is wrong for zero-length blocks, but will fail gracefully */
372         end_blk = (off + len - 1) / getpagesize();
373
374         /* Can only do direct if in single block and we've already copied. */
375         if (write_mode) {
376                 if (blk != end_blk)
377                         return NULL;
378                 if (blk >= tdb->transaction->num_blocks)
379                         return NULL;
380                 if (tdb->transaction->blocks[blk] == NULL)
381                         return NULL;
382                 return tdb->transaction->blocks[blk] + off % getpagesize();
383         }
384
385         /* Single which we have copied? */
386         if (blk == end_blk
387             && blk < tdb->transaction->num_blocks
388             && tdb->transaction->blocks[blk])
389                 return tdb->transaction->blocks[blk] + off % getpagesize();
390
391         /* Otherwise must be all not copied. */
392         while (blk < end_blk) {
393                 if (blk >= tdb->transaction->num_blocks)
394                         break;
395                 if (tdb->transaction->blocks[blk])
396                         return NULL;
397                 blk++;
398         }
399         return tdb->transaction->io_methods->direct(tdb, off, len, false);
400 }
401
402 static const struct tdb_methods transaction_methods = {
403         transaction_read,
404         transaction_write,
405         transaction_oob,
406         transaction_expand_file,
407         transaction_direct,
408 };
409
410 /*
411   sync to disk
412 */
413 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
414 {
415         if (tdb->flags & TDB_NOSYNC) {
416                 return 0;
417         }
418
419         if (fsync(tdb->fd) != 0) {
420                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
421                            "tdb_transaction: fsync failed: %s",
422                            strerror(errno));
423                 return -1;
424         }
425 #ifdef MS_SYNC
426         if (tdb->map_ptr) {
427                 tdb_off_t moffset = offset & ~(getpagesize()-1);
428                 if (msync(moffset + (char *)tdb->map_ptr,
429                           length + (offset - moffset), MS_SYNC) != 0) {
430                         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
431                                    "tdb_transaction: msync failed: %s",
432                                    strerror(errno));
433                         return -1;
434                 }
435         }
436 #endif
437         return 0;
438 }
439
440
441 static void _tdb_transaction_cancel(struct tdb_context *tdb)
442 {
443         int i;
444
445         if (tdb->transaction == NULL) {
446                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
447                            "tdb_transaction_cancel: no transaction");
448                 return;
449         }
450
451         if (tdb->transaction->nesting != 0) {
452                 tdb->transaction->transaction_error = 1;
453                 tdb->transaction->nesting--;
454                 return;
455         }
456
457         tdb->map_size = tdb->transaction->old_map_size;
458
459         /* free all the transaction blocks */
460         for (i=0;i<tdb->transaction->num_blocks;i++) {
461                 if (tdb->transaction->blocks[i] != NULL) {
462                         free(tdb->transaction->blocks[i]);
463                 }
464         }
465         SAFE_FREE(tdb->transaction->blocks);
466
467         if (tdb->transaction->magic_offset) {
468                 const struct tdb_methods *methods = tdb->transaction->io_methods;
469                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
470
471                 /* remove the recovery marker */
472                 if (methods->twrite(tdb, tdb->transaction->magic_offset,
473                                     &invalid, sizeof(invalid)) == -1 ||
474                     transaction_sync(tdb, tdb->transaction->magic_offset,
475                                      sizeof(invalid)) == -1) {
476                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
477                                    "tdb_transaction_cancel: failed to remove"
478                                    " recovery magic");
479                 }
480         }
481
482         if (tdb->allrecord_lock.count)
483                 tdb_allrecord_unlock(tdb, tdb->allrecord_lock.ltype);
484
485         /* restore the normal io methods */
486         tdb->methods = tdb->transaction->io_methods;
487
488         tdb_transaction_unlock(tdb, F_WRLCK);
489
490         if (tdb_has_open_lock(tdb))
491                 tdb_unlock_open(tdb);
492
493         SAFE_FREE(tdb->transaction);
494 }
495
496 /*
497   start a tdb transaction. No token is returned, as only a single
498   transaction is allowed to be pending per tdb_context
499 */
500 int tdb_transaction_start(struct tdb_context *tdb)
501 {
502         enum TDB_ERROR ecode;
503
504         /* some sanity checks */
505         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
506                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
507                            "tdb_transaction_start: cannot start a transaction"
508                            " on a read-only or internal db");
509                 return -1;
510         }
511
512         /* cope with nested tdb_transaction_start() calls */
513         if (tdb->transaction != NULL) {
514                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_USE_ERROR,
515                            "tdb_transaction_start:"
516                            " already inside transaction");
517                 return -1;
518         }
519
520         if (tdb_has_hash_locks(tdb)) {
521                 /* the caller must not have any locks when starting a
522                    transaction as otherwise we'll be screwed by lack
523                    of nested locks in posix */
524                 tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
525                            "tdb_transaction_start: cannot start a transaction"
526                            " with locks held");
527                 return -1;
528         }
529
530         tdb->transaction = (struct tdb_transaction *)
531                 calloc(sizeof(struct tdb_transaction), 1);
532         if (tdb->transaction == NULL) {
533                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
534                            "tdb_transaction_start: cannot allocate");
535                 return -1;
536         }
537
538         /* get the transaction write lock. This is a blocking lock. As
539            discussed with Volker, there are a number of ways we could
540            make this async, which we will probably do in the future */
541         ecode = tdb_transaction_lock(tdb, F_WRLCK);
542         if (ecode != TDB_SUCCESS) {
543                 tdb->ecode = ecode;
544                 SAFE_FREE(tdb->transaction->blocks);
545                 SAFE_FREE(tdb->transaction);
546                 return -1;
547         }
548
549         /* get a read lock over entire file. This is upgraded to a write
550            lock during the commit */
551         ecode = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true);
552         if (ecode != TDB_SUCCESS) {
553                 tdb->ecode = ecode;
554                 goto fail_allrecord_lock;
555         }
556
557         /* make sure we know about any file expansions already done by
558            anyone else */
559         tdb->methods->oob(tdb, tdb->map_size + 1, true);
560         tdb->transaction->old_map_size = tdb->map_size;
561
562         /* finally hook the io methods, replacing them with
563            transaction specific methods */
564         tdb->transaction->io_methods = tdb->methods;
565         tdb->methods = &transaction_methods;
566         return 0;
567
568 fail_allrecord_lock:
569         tdb_transaction_unlock(tdb, F_WRLCK);
570         SAFE_FREE(tdb->transaction->blocks);
571         SAFE_FREE(tdb->transaction);
572         return -1;
573 }
574
575
576 /*
577   cancel the current transaction
578 */
579 void tdb_transaction_cancel(struct tdb_context *tdb)
580 {
581         _tdb_transaction_cancel(tdb);
582 }
583
584 /*
585   work out how much space the linearised recovery data will consume
586 */
587 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
588 {
589         tdb_len_t recovery_size = 0;
590         int i;
591
592         recovery_size = sizeof(tdb_len_t);
593         for (i=0;i<tdb->transaction->num_blocks;i++) {
594                 if (i * getpagesize() >= tdb->transaction->old_map_size) {
595                         break;
596                 }
597                 if (tdb->transaction->blocks[i] == NULL) {
598                         continue;
599                 }
600                 recovery_size += 2*sizeof(tdb_off_t);
601                 if (i == tdb->transaction->num_blocks-1) {
602                         recovery_size += tdb->transaction->last_block_size;
603                 } else {
604                         recovery_size += getpagesize();
605                 }
606         }
607
608         return recovery_size;
609 }
610
611 /*
612   allocate the recovery area, or use an existing recovery area if it is
613   large enough
614 */
615 static int tdb_recovery_allocate(struct tdb_context *tdb,
616                                  tdb_len_t *recovery_size,
617                                  tdb_off_t *recovery_offset,
618                                  tdb_len_t *recovery_max_size)
619 {
620         struct tdb_recovery_record rec;
621         const struct tdb_methods *methods = tdb->transaction->io_methods;
622         tdb_off_t recovery_head;
623         size_t addition;
624
625         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
626         if (recovery_head == TDB_OFF_ERR) {
627                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
628                          "tdb_recovery_allocate:"
629                          " failed to read recovery head");
630                 return -1;
631         }
632
633         if (recovery_head != 0) {
634                 if (methods->tread(tdb, recovery_head, &rec, sizeof(rec))) {
635                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
636                                  "tdb_recovery_allocate:"
637                                  " failed to read recovery record");
638                         return -1;
639                 }
640                 tdb_convert(tdb, &rec, sizeof(rec));
641                 /* ignore invalid recovery regions: can happen in crash */
642                 if (rec.magic != TDB_RECOVERY_MAGIC &&
643                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
644                         recovery_head = 0;
645                 }
646         }
647
648         *recovery_size = tdb_recovery_size(tdb);
649
650         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
651                 /* it fits in the existing area */
652                 *recovery_max_size = rec.max_len;
653                 *recovery_offset = recovery_head;
654                 return 0;
655         }
656
657         /* we need to free up the old recovery area, then allocate a
658            new one at the end of the file. Note that we cannot use
659            normal allocation to allocate the new one as that might return
660            us an area that is being currently used (as of the start of
661            the transaction) */
662         if (recovery_head != 0) {
663                 add_stat(tdb, frees, 1);
664                 if (add_free_record(tdb, recovery_head,
665                                     sizeof(rec) + rec.max_len) != 0) {
666                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
667                                    "tdb_recovery_allocate:"
668                                    " failed to free previous recovery area");
669                         return -1;
670                 }
671         }
672
673         /* the tdb_free() call might have increased the recovery size */
674         *recovery_size = tdb_recovery_size(tdb);
675
676         /* round up to a multiple of page size */
677         *recovery_max_size
678                 = (((sizeof(rec) + *recovery_size) + getpagesize()-1)
679                    & ~(getpagesize()-1))
680                 - sizeof(rec);
681         *recovery_offset = tdb->map_size;
682         recovery_head = *recovery_offset;
683
684         /* Restore ->map_size before calling underlying expand_file.
685            Also so that we don't try to expand the file again in the
686            transaction commit, which would destroy the recovery
687            area */
688         addition = (tdb->map_size - tdb->transaction->old_map_size) +
689                 sizeof(rec) + *recovery_max_size;
690         tdb->map_size = tdb->transaction->old_map_size;
691         if (methods->expand_file(tdb, addition) == -1) {
692                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
693                          "tdb_recovery_allocate:"
694                          " failed to create recovery area");
695                 return -1;
696         }
697
698         /* we have to reset the old map size so that we don't try to
699            expand the file again in the transaction commit, which
700            would destroy the recovery area */
701         tdb->transaction->old_map_size = tdb->map_size;
702
703         /* write the recovery header offset and sync - we can sync without a race here
704            as the magic ptr in the recovery record has not been set */
705         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
706         if (methods->twrite(tdb, offsetof(struct tdb_header, recovery),
707                             &recovery_head, sizeof(tdb_off_t)) == -1) {
708                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
709                          "tdb_recovery_allocate:"
710                          " failed to write recovery head");
711                 return -1;
712         }
713         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
714                                    &recovery_head,
715                                    sizeof(tdb_off_t));
716         return 0;
717 }
718
719 /* Set up header for the recovery record. */
720 static void set_recovery_header(struct tdb_recovery_record *rec,
721                                 uint64_t magic,
722                                 uint64_t datalen, uint64_t actuallen,
723                                 uint64_t oldsize)
724 {
725         rec->magic = magic;
726         rec->max_len = actuallen;
727         rec->len = datalen;
728         rec->eof = oldsize;
729 }
730
731 /*
732   setup the recovery data that will be used on a crash during commit
733 */
734 static int transaction_setup_recovery(struct tdb_context *tdb,
735                                       tdb_off_t *magic_offset)
736 {
737         tdb_len_t recovery_size;
738         unsigned char *data, *p;
739         const struct tdb_methods *methods = tdb->transaction->io_methods;
740         struct tdb_recovery_record *rec;
741         tdb_off_t recovery_offset, recovery_max_size;
742         tdb_off_t old_map_size = tdb->transaction->old_map_size;
743         uint64_t magic, tailer;
744         int i;
745
746         /*
747           check that the recovery area has enough space
748         */
749         if (tdb_recovery_allocate(tdb, &recovery_size,
750                                   &recovery_offset, &recovery_max_size) == -1) {
751                 return -1;
752         }
753
754         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
755         if (data == NULL) {
756                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
757                            "transaction_setup_recovery: cannot allocate");
758                 return -1;
759         }
760
761         rec = (struct tdb_recovery_record *)data;
762         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
763                             recovery_size, recovery_max_size, old_map_size);
764         tdb_convert(tdb, rec, sizeof(*rec));
765
766         /* build the recovery data into a single blob to allow us to do a single
767            large write, which should be more efficient */
768         p = data + sizeof(*rec);
769         for (i=0;i<tdb->transaction->num_blocks;i++) {
770                 tdb_off_t offset;
771                 tdb_len_t length;
772
773                 if (tdb->transaction->blocks[i] == NULL) {
774                         continue;
775                 }
776
777                 offset = i * getpagesize();
778                 length = getpagesize();
779                 if (i == tdb->transaction->num_blocks-1) {
780                         length = tdb->transaction->last_block_size;
781                 }
782
783                 if (offset >= old_map_size) {
784                         continue;
785                 }
786                 if (offset + length > tdb->map_size) {
787                         tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
788                                    "tdb_transaction_setup_recovery:"
789                                    " transaction data over new region boundary");
790                         free(data);
791                         return -1;
792                 }
793                 memcpy(p, &offset, sizeof(offset));
794                 memcpy(p + sizeof(offset), &length, sizeof(length));
795                 tdb_convert(tdb, p, sizeof(offset) + sizeof(length));
796
797                 /* the recovery area contains the old data, not the
798                    new data, so we have to call the original tdb_read
799                    method to get it */
800                 if (methods->tread(tdb, offset,
801                                    p + sizeof(offset) + sizeof(length),
802                                    length) != 0) {
803                         free(data);
804                         return -1;
805                 }
806                 p += sizeof(offset) + sizeof(length) + length;
807         }
808
809         /* and the tailer */
810         tailer = sizeof(*rec) + recovery_max_size;
811         memcpy(p, &tailer, sizeof(tailer));
812         tdb_convert(tdb, p, sizeof(tailer));
813
814         /* write the recovery data to the recovery area */
815         if (methods->twrite(tdb, recovery_offset, data,
816                             sizeof(*rec) + recovery_size) == -1) {
817                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
818                          "tdb_transaction_setup_recovery:"
819                          " failed to write recovery data");
820                 free(data);
821                 return -1;
822         }
823         transaction_write_existing(tdb, recovery_offset, data,
824                                    sizeof(*rec) + recovery_size);
825
826         /* as we don't have ordered writes, we have to sync the recovery
827            data before we update the magic to indicate that the recovery
828            data is present */
829         if (transaction_sync(tdb, recovery_offset,
830                              sizeof(*rec) + recovery_size) == -1) {
831                 free(data);
832                 return -1;
833         }
834
835         free(data);
836
837         magic = TDB_RECOVERY_MAGIC;
838         tdb_convert(tdb, &magic, sizeof(magic));
839
840         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
841                                                    magic);
842
843         if (methods->twrite(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
844                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
845                          "tdb_transaction_setup_recovery:"
846                          " failed to write recovery magic");
847                 return -1;
848         }
849         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
850
851         /* ensure the recovery magic marker is on disk */
852         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
853                 return -1;
854         }
855
856         return 0;
857 }
858
859 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
860 {
861         const struct tdb_methods *methods;
862         enum TDB_ERROR ecode;
863
864         if (tdb->transaction == NULL) {
865                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
866                            "tdb_transaction_prepare_commit: no transaction");
867                 return -1;
868         }
869
870         if (tdb->transaction->prepared) {
871                 _tdb_transaction_cancel(tdb);
872                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
873                            "tdb_transaction_prepare_commit:"
874                            " transaction already prepared");
875                 return -1;
876         }
877
878         if (tdb->transaction->transaction_error) {
879                 _tdb_transaction_cancel(tdb);
880                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
881                            "tdb_transaction_prepare_commit:"
882                            " transaction error pending");
883                 return -1;
884         }
885
886
887         if (tdb->transaction->nesting != 0) {
888                 tdb->transaction->nesting--;
889                 return 0;
890         }
891
892         /* check for a null transaction */
893         if (tdb->transaction->blocks == NULL) {
894                 return 0;
895         }
896
897         methods = tdb->transaction->io_methods;
898
899         /* upgrade the main transaction lock region to a write lock */
900         ecode = tdb_allrecord_upgrade(tdb);
901         if (ecode != TDB_SUCCESS) {
902                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
903                          "tdb_transaction_prepare_commit:"
904                          " failed to upgrade hash locks");
905                 _tdb_transaction_cancel(tdb);
906                 return -1;
907         }
908
909         /* get the open lock - this prevents new users attaching to the database
910            during the commit */
911         ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
912         if (ecode != TDB_SUCCESS) {
913                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
914                            "tdb_transaction_prepare_commit:"
915                            " failed to get open lock");
916                 _tdb_transaction_cancel(tdb);
917                 return -1;
918         }
919
920         /* Since we have whole db locked, we don't need the expansion lock. */
921         if (!(tdb->flags & TDB_NOSYNC)) {
922                 /* write the recovery data to the end of the file */
923                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
924                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
925                                  "tdb_transaction_prepare_commit:"
926                                  " failed to setup recovery data");
927                         _tdb_transaction_cancel(tdb);
928                         return -1;
929                 }
930         }
931
932         tdb->transaction->prepared = true;
933
934         /* expand the file to the new size if needed */
935         if (tdb->map_size != tdb->transaction->old_map_size) {
936                 tdb_len_t add = tdb->map_size - tdb->transaction->old_map_size;
937                 /* Restore original map size for tdb_expand_file */
938                 tdb->map_size = tdb->transaction->old_map_size;
939                 if (methods->expand_file(tdb, add) == -1) {
940                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
941                                  "tdb_transaction_prepare_commit:"
942                                  " expansion failed");
943                         _tdb_transaction_cancel(tdb);
944                         return -1;
945                 }
946         }
947
948         /* Keep the open lock until the actual commit */
949
950         return 0;
951 }
952
953 /*
954    prepare to commit the current transaction
955 */
956 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
957 {
958         return _tdb_transaction_prepare_commit(tdb);
959 }
960
961 /*
962   commit the current transaction
963 */
964 int tdb_transaction_commit(struct tdb_context *tdb)
965 {
966         const struct tdb_methods *methods;
967         int i;
968
969         if (tdb->transaction == NULL) {
970                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
971                          "tdb_transaction_commit: no transaction");
972                 return -1;
973         }
974
975         tdb_trace(tdb, "tdb_transaction_commit");
976
977         if (tdb->transaction->nesting != 0) {
978                 tdb->transaction->nesting--;
979                 return 0;
980         }
981
982         /* check for a null transaction */
983         if (tdb->transaction->blocks == NULL) {
984                 _tdb_transaction_cancel(tdb);
985                 return 0;
986         }
987
988         if (!tdb->transaction->prepared) {
989                 int ret = _tdb_transaction_prepare_commit(tdb);
990                 if (ret)
991                         return ret;
992         }
993
994         methods = tdb->transaction->io_methods;
995
996         /* perform all the writes */
997         for (i=0;i<tdb->transaction->num_blocks;i++) {
998                 tdb_off_t offset;
999                 tdb_len_t length;
1000
1001                 if (tdb->transaction->blocks[i] == NULL) {
1002                         continue;
1003                 }
1004
1005                 offset = i * getpagesize();
1006                 length = getpagesize();
1007                 if (i == tdb->transaction->num_blocks-1) {
1008                         length = tdb->transaction->last_block_size;
1009                 }
1010
1011                 if (methods->twrite(tdb, offset, tdb->transaction->blocks[i],
1012                                     length) == -1) {
1013                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1014                                    "tdb_transaction_commit:"
1015                                    " write failed during commit");
1016
1017                         /* we've overwritten part of the data and
1018                            possibly expanded the file, so we need to
1019                            run the crash recovery code */
1020                         tdb->methods = methods;
1021                         tdb_transaction_recover(tdb);
1022
1023                         _tdb_transaction_cancel(tdb);
1024
1025                         return -1;
1026                 }
1027                 SAFE_FREE(tdb->transaction->blocks[i]);
1028         }
1029
1030         SAFE_FREE(tdb->transaction->blocks);
1031         tdb->transaction->num_blocks = 0;
1032
1033         /* ensure the new data is on disk */
1034         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1035                 return -1;
1036         }
1037
1038         /*
1039           TODO: maybe write to some dummy hdr field, or write to magic
1040           offset without mmap, before the last sync, instead of the
1041           utime() call
1042         */
1043
1044         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1045            don't change the mtime of the file, this means the file may
1046            not be backed up (as tdb rounding to block sizes means that
1047            file size changes are quite rare too). The following forces
1048            mtime changes when a transaction completes */
1049 #if HAVE_UTIME
1050         utime(tdb->name, NULL);
1051 #endif
1052
1053         /* use a transaction cancel to free memory and remove the
1054            transaction locks */
1055         _tdb_transaction_cancel(tdb);
1056
1057         return 0;
1058 }
1059
1060
1061 /*
1062   recover from an aborted transaction. Must be called with exclusive
1063   database write access already established (including the open
1064   lock to prevent new processes attaching)
1065 */
1066 int tdb_transaction_recover(struct tdb_context *tdb)
1067 {
1068         tdb_off_t recovery_head, recovery_eof;
1069         unsigned char *data, *p;
1070         struct tdb_recovery_record rec;
1071
1072         /* find the recovery area */
1073         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1074         if (recovery_head == TDB_OFF_ERR) {
1075                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1076                          "tdb_transaction_recover:"
1077                          " failed to read recovery head");
1078                 return -1;
1079         }
1080
1081         if (recovery_head == 0) {
1082                 /* we have never allocated a recovery record */
1083                 return 0;
1084         }
1085
1086         /* read the recovery record */
1087         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1088                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1089                            "tdb_transaction_recover:"
1090                            " failed to read recovery record");
1091                 return -1;
1092         }
1093
1094         if (rec.magic != TDB_RECOVERY_MAGIC) {
1095                 /* there is no valid recovery data */
1096                 return 0;
1097         }
1098
1099         if (tdb->read_only) {
1100                 tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1101                            "tdb_transaction_recover:"
1102                            " attempt to recover read only database");
1103                 return -1;
1104         }
1105
1106         recovery_eof = rec.eof;
1107
1108         data = (unsigned char *)malloc(rec.len);
1109         if (data == NULL) {
1110                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1111                            "tdb_transaction_recover:"
1112                            " failed to allocate recovery data");
1113                 return -1;
1114         }
1115
1116         /* read the full recovery data */
1117         if (tdb->methods->tread(tdb, recovery_head + sizeof(rec), data,
1118                                 rec.len) == -1) {
1119                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1120                            "tdb_transaction_recover:"
1121                            " failed to read recovery data");
1122                 return -1;
1123         }
1124
1125         /* recover the file data */
1126         p = data;
1127         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1128                 tdb_off_t ofs;
1129                 tdb_len_t len;
1130                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1131                 memcpy(&ofs, p, sizeof(ofs));
1132                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1133                 p += sizeof(ofs) + sizeof(len);
1134
1135                 if (tdb->methods->twrite(tdb, ofs, p, len) == -1) {
1136                         free(data);
1137                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1138                                  "tdb_transaction_recover:"
1139                                  " failed to recover %zu bytes at offset %zu",
1140                                  (size_t)len, (size_t)ofs);
1141                         return -1;
1142                 }
1143                 p += len;
1144         }
1145
1146         free(data);
1147
1148         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1149                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1150                            "tdb_transaction_recover: failed to sync recovery");
1151                 return -1;
1152         }
1153
1154         /* if the recovery area is after the recovered eof then remove it */
1155         if (recovery_eof <= recovery_head) {
1156                 if (tdb_write_off(tdb, offsetof(struct tdb_header,recovery), 0)
1157                     == -1) {
1158                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1159                                  "tdb_transaction_recover:"
1160                                  " failed to remove recovery head");
1161                         return -1;
1162                 }
1163         }
1164
1165         /* remove the recovery magic */
1166         if (tdb_write_off(tdb,
1167                           recovery_head
1168                           + offsetof(struct tdb_recovery_record, magic),
1169                           TDB_RECOVERY_INVALID_MAGIC) == -1) {
1170                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1171                          "tdb_transaction_recover:"
1172                          " failed to remove recovery magic");
1173                 return -1;
1174         }
1175
1176         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1177                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1178                          "tdb_transaction_recover: failed to sync2 recovery");
1179                 return -1;
1180         }
1181
1182         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1183                    "tdb_transaction_recover: recovered %zu byte database",
1184                    (size_t)recovery_eof);
1185
1186         /* all done */
1187         return 0;
1188 }
1189
1190 /* Any I/O failures we say "needs recovery". */
1191 bool tdb_needs_recovery(struct tdb_context *tdb)
1192 {
1193         tdb_off_t recovery_head;
1194         struct tdb_recovery_record rec;
1195
1196         /* find the recovery area */
1197         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1198         if (recovery_head == TDB_OFF_ERR) {
1199                 return true;
1200         }
1201
1202         if (recovery_head == 0) {
1203                 /* we have never allocated a recovery record */
1204                 return false;
1205         }
1206
1207         /* read the recovery record */
1208         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1209                 return true;
1210         }
1211
1212         return (rec.magic == TDB_RECOVERY_MAGIC);
1213 }