]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
tdb2: simplify logging levels, rename TDB_DEBUG_* to TDB_LOG_*
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in posix locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is cancelled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or cancelled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129                             tdb_len_t len)
130 {
131         size_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % getpagesize()) > getpagesize()) {
135                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
136                 if (transaction_read(tdb, off, buf, len2) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / getpagesize();
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->tread(tdb, off, buf, len)
155                     != 0) {
156                         goto fail;
157                 }
158                 return 0;
159         }
160
161         /* it is in the block list. Now check for the last block */
162         if (blk == tdb->transaction->num_blocks-1) {
163                 if (len > tdb->transaction->last_block_size) {
164                         goto fail;
165                 }
166         }
167
168         /* now copy it out of this block */
169         memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len);
170         return 0;
171
172 fail:
173         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
174                    "transaction_read: failed at off=%zu len=%zu",
175                    (size_t)off, (size_t)len);
176         tdb->transaction->transaction_error = 1;
177         return -1;
178 }
179
180
181 /*
182   write while in a transaction
183 */
184 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
185                              const void *buf, tdb_len_t len)
186 {
187         size_t blk;
188
189         /* Only a commit is allowed on a prepared transaction */
190         if (tdb->transaction->prepared) {
191                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
192                          "transaction_write: transaction already prepared,"
193                          " write not allowed");
194                 goto fail;
195         }
196
197         /* break it up into block sized chunks */
198         while (len + (off % getpagesize()) > getpagesize()) {
199                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
200                 if (transaction_write(tdb, off, buf, len2) != 0) {
201                         return -1;
202                 }
203                 len -= len2;
204                 off += len2;
205                 if (buf != NULL) {
206                         buf = (const void *)(len2 + (const char *)buf);
207                 }
208         }
209
210         if (len == 0) {
211                 return 0;
212         }
213
214         blk = off / getpagesize();
215         off = off % getpagesize();
216
217         if (tdb->transaction->num_blocks <= blk) {
218                 uint8_t **new_blocks;
219                 /* expand the blocks array */
220                 if (tdb->transaction->blocks == NULL) {
221                         new_blocks = (uint8_t **)malloc(
222                                 (blk+1)*sizeof(uint8_t *));
223                 } else {
224                         new_blocks = (uint8_t **)realloc(
225                                 tdb->transaction->blocks,
226                                 (blk+1)*sizeof(uint8_t *));
227                 }
228                 if (new_blocks == NULL) {
229                         tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
230                                    "transaction_write: failed to allocate");
231                         goto fail;
232                 }
233                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
234                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
235                 tdb->transaction->blocks = new_blocks;
236                 tdb->transaction->num_blocks = blk+1;
237                 tdb->transaction->last_block_size = 0;
238         }
239
240         /* allocate and fill a block? */
241         if (tdb->transaction->blocks[blk] == NULL) {
242                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1);
243                 if (tdb->transaction->blocks[blk] == NULL) {
244                         tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
245                                    "transaction_write: failed to allocate");
246                         goto fail;
247                 }
248                 if (tdb->transaction->old_map_size > blk * getpagesize()) {
249                         tdb_len_t len2 = getpagesize();
250                         if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) {
251                                 len2 = tdb->transaction->old_map_size - (blk * getpagesize());
252                         }
253                         if (tdb->transaction->io_methods->tread(tdb, blk * getpagesize(),
254                                                                 tdb->transaction->blocks[blk],
255                                                                 len2) != 0) {
256                                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
257                                            "transaction_write: failed to"
258                                            " read old block: %s",
259                                            strerror(errno));
260                                 SAFE_FREE(tdb->transaction->blocks[blk]);
261                                 goto fail;
262                         }
263                         if (blk == tdb->transaction->num_blocks-1) {
264                                 tdb->transaction->last_block_size = len2;
265                         }
266                 }
267         }
268
269         /* overwrite part of an existing block */
270         if (buf == NULL) {
271                 memset(tdb->transaction->blocks[blk] + off, 0, len);
272         } else {
273                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
274         }
275         if (blk == tdb->transaction->num_blocks-1) {
276                 if (len + off > tdb->transaction->last_block_size) {
277                         tdb->transaction->last_block_size = len + off;
278                 }
279         }
280
281         return 0;
282
283 fail:
284         tdb->transaction->transaction_error = 1;
285         return -1;
286 }
287
288
289 /*
290   write while in a transaction - this varient never expands the transaction blocks, it only
291   updates existing blocks. This means it cannot change the recovery size
292 */
293 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
294                                        const void *buf, tdb_len_t len)
295 {
296         size_t blk;
297
298         /* break it up into block sized chunks */
299         while (len + (off % getpagesize()) > getpagesize()) {
300                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
301                 transaction_write_existing(tdb, off, buf, len2);
302                 len -= len2;
303                 off += len2;
304                 if (buf != NULL) {
305                         buf = (const void *)(len2 + (const char *)buf);
306                 }
307         }
308
309         if (len == 0) {
310                 return;
311         }
312
313         blk = off / getpagesize();
314         off = off % getpagesize();
315
316         if (tdb->transaction->num_blocks <= blk ||
317             tdb->transaction->blocks[blk] == NULL) {
318                 return;
319         }
320
321         if (blk == tdb->transaction->num_blocks-1 &&
322             off + len > tdb->transaction->last_block_size) {
323                 if (off >= tdb->transaction->last_block_size) {
324                         return;
325                 }
326                 len = tdb->transaction->last_block_size - off;
327         }
328
329         /* overwrite part of an existing block */
330         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
331 }
332
333
334 /*
335   out of bounds check during a transaction
336 */
337 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
338 {
339         if (len <= tdb->map_size) {
340                 return 0;
341         }
342         tdb->ecode = TDB_ERR_IO;
343         if (!probe) {
344                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
345                            "tdb_oob len %lld beyond transaction size %lld",
346                            (long long)len,
347                            (long long)tdb->map_size);
348         }
349         return -1;
350 }
351
352 /*
353   transaction version of tdb_expand().
354 */
355 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t addition)
356 {
357         /* add a write to the transaction elements, so subsequent
358            reads see the zero data */
359         if (transaction_write(tdb, tdb->map_size, NULL, addition) != 0) {
360                 return -1;
361         }
362         tdb->map_size += addition;
363         return 0;
364 }
365
366 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
367                                 size_t len, bool write_mode)
368 {
369         size_t blk = off / getpagesize(), end_blk;
370
371         /* This is wrong for zero-length blocks, but will fail gracefully */
372         end_blk = (off + len - 1) / getpagesize();
373
374         /* Can only do direct if in single block and we've already copied. */
375         if (write_mode) {
376                 if (blk != end_blk)
377                         return NULL;
378                 if (blk >= tdb->transaction->num_blocks)
379                         return NULL;
380                 if (tdb->transaction->blocks[blk] == NULL)
381                         return NULL;
382                 return tdb->transaction->blocks[blk] + off % getpagesize();
383         }
384
385         /* Single which we have copied? */
386         if (blk == end_blk
387             && blk < tdb->transaction->num_blocks
388             && tdb->transaction->blocks[blk])
389                 return tdb->transaction->blocks[blk] + off % getpagesize();
390
391         /* Otherwise must be all not copied. */
392         while (blk < end_blk) {
393                 if (blk >= tdb->transaction->num_blocks)
394                         break;
395                 if (tdb->transaction->blocks[blk])
396                         return NULL;
397                 blk++;
398         }
399         return tdb->transaction->io_methods->direct(tdb, off, len, false);
400 }
401
402 static const struct tdb_methods transaction_methods = {
403         transaction_read,
404         transaction_write,
405         transaction_oob,
406         transaction_expand_file,
407         transaction_direct,
408 };
409
410 /*
411   sync to disk
412 */
413 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
414 {
415         if (tdb->flags & TDB_NOSYNC) {
416                 return 0;
417         }
418
419         if (fsync(tdb->fd) != 0) {
420                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
421                            "tdb_transaction: fsync failed: %s",
422                            strerror(errno));
423                 return -1;
424         }
425 #ifdef MS_SYNC
426         if (tdb->map_ptr) {
427                 tdb_off_t moffset = offset & ~(getpagesize()-1);
428                 if (msync(moffset + (char *)tdb->map_ptr,
429                           length + (offset - moffset), MS_SYNC) != 0) {
430                         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
431                                    "tdb_transaction: msync failed: %s",
432                                    strerror(errno));
433                         return -1;
434                 }
435         }
436 #endif
437         return 0;
438 }
439
440
441 static void _tdb_transaction_cancel(struct tdb_context *tdb)
442 {
443         int i;
444
445         if (tdb->transaction == NULL) {
446                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
447                            "tdb_transaction_cancel: no transaction");
448                 return;
449         }
450
451         if (tdb->transaction->nesting != 0) {
452                 tdb->transaction->transaction_error = 1;
453                 tdb->transaction->nesting--;
454                 return;
455         }
456
457         tdb->map_size = tdb->transaction->old_map_size;
458
459         /* free all the transaction blocks */
460         for (i=0;i<tdb->transaction->num_blocks;i++) {
461                 if (tdb->transaction->blocks[i] != NULL) {
462                         free(tdb->transaction->blocks[i]);
463                 }
464         }
465         SAFE_FREE(tdb->transaction->blocks);
466
467         if (tdb->transaction->magic_offset) {
468                 const struct tdb_methods *methods = tdb->transaction->io_methods;
469                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
470
471                 /* remove the recovery marker */
472                 if (methods->twrite(tdb, tdb->transaction->magic_offset,
473                                     &invalid, sizeof(invalid)) == -1 ||
474                     transaction_sync(tdb, tdb->transaction->magic_offset,
475                                      sizeof(invalid)) == -1) {
476                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
477                                    "tdb_transaction_cancel: failed to remove"
478                                    " recovery magic");
479                 }
480         }
481
482         if (tdb->allrecord_lock.count)
483                 tdb_allrecord_unlock(tdb, tdb->allrecord_lock.ltype);
484
485         /* restore the normal io methods */
486         tdb->methods = tdb->transaction->io_methods;
487
488         tdb_transaction_unlock(tdb, F_WRLCK);
489
490         if (tdb_has_open_lock(tdb))
491                 tdb_unlock_open(tdb);
492
493         SAFE_FREE(tdb->transaction);
494 }
495
496 /*
497   start a tdb transaction. No token is returned, as only a single
498   transaction is allowed to be pending per tdb_context
499 */
500 int tdb_transaction_start(struct tdb_context *tdb)
501 {
502         /* some sanity checks */
503         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
504                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
505                            "tdb_transaction_start: cannot start a transaction"
506                            " on a read-only or internal db");
507                 return -1;
508         }
509
510         /* cope with nested tdb_transaction_start() calls */
511         if (tdb->transaction != NULL) {
512                 tdb_logerr(tdb, TDB_ERR_NESTING, TDB_LOG_USE_ERROR,
513                            "tdb_transaction_start:"
514                            " already inside transaction");
515                 return -1;
516         }
517
518         if (tdb_has_hash_locks(tdb)) {
519                 /* the caller must not have any locks when starting a
520                    transaction as otherwise we'll be screwed by lack
521                    of nested locks in posix */
522                 tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
523                            "tdb_transaction_start: cannot start a transaction"
524                            " with locks held");
525                 return -1;
526         }
527
528         tdb->transaction = (struct tdb_transaction *)
529                 calloc(sizeof(struct tdb_transaction), 1);
530         if (tdb->transaction == NULL) {
531                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
532                            "tdb_transaction_start: cannot allocate");
533                 return -1;
534         }
535
536         /* get the transaction write lock. This is a blocking lock. As
537            discussed with Volker, there are a number of ways we could
538            make this async, which we will probably do in the future */
539         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
540                 SAFE_FREE(tdb->transaction->blocks);
541                 SAFE_FREE(tdb->transaction);
542                 return -1;
543         }
544
545         /* get a read lock over entire file. This is upgraded to a write
546            lock during the commit */
547         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
548                 goto fail_allrecord_lock;
549         }
550
551         /* make sure we know about any file expansions already done by
552            anyone else */
553         tdb->methods->oob(tdb, tdb->map_size + 1, true);
554         tdb->transaction->old_map_size = tdb->map_size;
555
556         /* finally hook the io methods, replacing them with
557            transaction specific methods */
558         tdb->transaction->io_methods = tdb->methods;
559         tdb->methods = &transaction_methods;
560         return 0;
561
562 fail_allrecord_lock:
563         tdb_transaction_unlock(tdb, F_WRLCK);
564         SAFE_FREE(tdb->transaction->blocks);
565         SAFE_FREE(tdb->transaction);
566         return -1;
567 }
568
569
570 /*
571   cancel the current transaction
572 */
573 void tdb_transaction_cancel(struct tdb_context *tdb)
574 {
575         _tdb_transaction_cancel(tdb);
576 }
577
578 /*
579   work out how much space the linearised recovery data will consume
580 */
581 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
582 {
583         tdb_len_t recovery_size = 0;
584         int i;
585
586         recovery_size = sizeof(tdb_len_t);
587         for (i=0;i<tdb->transaction->num_blocks;i++) {
588                 if (i * getpagesize() >= tdb->transaction->old_map_size) {
589                         break;
590                 }
591                 if (tdb->transaction->blocks[i] == NULL) {
592                         continue;
593                 }
594                 recovery_size += 2*sizeof(tdb_off_t);
595                 if (i == tdb->transaction->num_blocks-1) {
596                         recovery_size += tdb->transaction->last_block_size;
597                 } else {
598                         recovery_size += getpagesize();
599                 }
600         }
601
602         return recovery_size;
603 }
604
605 /*
606   allocate the recovery area, or use an existing recovery area if it is
607   large enough
608 */
609 static int tdb_recovery_allocate(struct tdb_context *tdb,
610                                  tdb_len_t *recovery_size,
611                                  tdb_off_t *recovery_offset,
612                                  tdb_len_t *recovery_max_size)
613 {
614         struct tdb_recovery_record rec;
615         const struct tdb_methods *methods = tdb->transaction->io_methods;
616         tdb_off_t recovery_head;
617         size_t addition;
618
619         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
620         if (recovery_head == TDB_OFF_ERR) {
621                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
622                          "tdb_recovery_allocate:"
623                          " failed to read recovery head");
624                 return -1;
625         }
626
627         if (recovery_head != 0) {
628                 if (methods->tread(tdb, recovery_head, &rec, sizeof(rec))) {
629                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
630                                  "tdb_recovery_allocate:"
631                                  " failed to read recovery record");
632                         return -1;
633                 }
634                 tdb_convert(tdb, &rec, sizeof(rec));
635                 /* ignore invalid recovery regions: can happen in crash */
636                 if (rec.magic != TDB_RECOVERY_MAGIC &&
637                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
638                         recovery_head = 0;
639                 }
640         }
641
642         *recovery_size = tdb_recovery_size(tdb);
643
644         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
645                 /* it fits in the existing area */
646                 *recovery_max_size = rec.max_len;
647                 *recovery_offset = recovery_head;
648                 return 0;
649         }
650
651         /* we need to free up the old recovery area, then allocate a
652            new one at the end of the file. Note that we cannot use
653            normal allocation to allocate the new one as that might return
654            us an area that is being currently used (as of the start of
655            the transaction) */
656         if (recovery_head != 0) {
657                 add_stat(tdb, frees, 1);
658                 if (add_free_record(tdb, recovery_head,
659                                     sizeof(rec) + rec.max_len) != 0) {
660                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
661                                    "tdb_recovery_allocate:"
662                                    " failed to free previous recovery area");
663                         return -1;
664                 }
665         }
666
667         /* the tdb_free() call might have increased the recovery size */
668         *recovery_size = tdb_recovery_size(tdb);
669
670         /* round up to a multiple of page size */
671         *recovery_max_size
672                 = (((sizeof(rec) + *recovery_size) + getpagesize()-1)
673                    & ~(getpagesize()-1))
674                 - sizeof(rec);
675         *recovery_offset = tdb->map_size;
676         recovery_head = *recovery_offset;
677
678         /* Restore ->map_size before calling underlying expand_file.
679            Also so that we don't try to expand the file again in the
680            transaction commit, which would destroy the recovery
681            area */
682         addition = (tdb->map_size - tdb->transaction->old_map_size) +
683                 sizeof(rec) + *recovery_max_size;
684         tdb->map_size = tdb->transaction->old_map_size;
685         if (methods->expand_file(tdb, addition) == -1) {
686                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
687                          "tdb_recovery_allocate:"
688                          " failed to create recovery area");
689                 return -1;
690         }
691
692         /* we have to reset the old map size so that we don't try to
693            expand the file again in the transaction commit, which
694            would destroy the recovery area */
695         tdb->transaction->old_map_size = tdb->map_size;
696
697         /* write the recovery header offset and sync - we can sync without a race here
698            as the magic ptr in the recovery record has not been set */
699         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
700         if (methods->twrite(tdb, offsetof(struct tdb_header, recovery),
701                             &recovery_head, sizeof(tdb_off_t)) == -1) {
702                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
703                          "tdb_recovery_allocate:"
704                          " failed to write recovery head");
705                 return -1;
706         }
707         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
708                                    &recovery_head,
709                                    sizeof(tdb_off_t));
710         return 0;
711 }
712
713 /* Set up header for the recovery record. */
714 static void set_recovery_header(struct tdb_recovery_record *rec,
715                                 uint64_t magic,
716                                 uint64_t datalen, uint64_t actuallen,
717                                 uint64_t oldsize)
718 {
719         rec->magic = magic;
720         rec->max_len = actuallen;
721         rec->len = datalen;
722         rec->eof = oldsize;
723 }
724
725 /*
726   setup the recovery data that will be used on a crash during commit
727 */
728 static int transaction_setup_recovery(struct tdb_context *tdb,
729                                       tdb_off_t *magic_offset)
730 {
731         tdb_len_t recovery_size;
732         unsigned char *data, *p;
733         const struct tdb_methods *methods = tdb->transaction->io_methods;
734         struct tdb_recovery_record *rec;
735         tdb_off_t recovery_offset, recovery_max_size;
736         tdb_off_t old_map_size = tdb->transaction->old_map_size;
737         uint64_t magic, tailer;
738         int i;
739
740         /*
741           check that the recovery area has enough space
742         */
743         if (tdb_recovery_allocate(tdb, &recovery_size,
744                                   &recovery_offset, &recovery_max_size) == -1) {
745                 return -1;
746         }
747
748         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
749         if (data == NULL) {
750                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
751                            "transaction_setup_recovery: cannot allocate");
752                 return -1;
753         }
754
755         rec = (struct tdb_recovery_record *)data;
756         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
757                             recovery_size, recovery_max_size, old_map_size);
758         tdb_convert(tdb, rec, sizeof(*rec));
759
760         /* build the recovery data into a single blob to allow us to do a single
761            large write, which should be more efficient */
762         p = data + sizeof(*rec);
763         for (i=0;i<tdb->transaction->num_blocks;i++) {
764                 tdb_off_t offset;
765                 tdb_len_t length;
766
767                 if (tdb->transaction->blocks[i] == NULL) {
768                         continue;
769                 }
770
771                 offset = i * getpagesize();
772                 length = getpagesize();
773                 if (i == tdb->transaction->num_blocks-1) {
774                         length = tdb->transaction->last_block_size;
775                 }
776
777                 if (offset >= old_map_size) {
778                         continue;
779                 }
780                 if (offset + length > tdb->map_size) {
781                         tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
782                                    "tdb_transaction_setup_recovery:"
783                                    " transaction data over new region boundary");
784                         free(data);
785                         return -1;
786                 }
787                 memcpy(p, &offset, sizeof(offset));
788                 memcpy(p + sizeof(offset), &length, sizeof(length));
789                 tdb_convert(tdb, p, sizeof(offset) + sizeof(length));
790
791                 /* the recovery area contains the old data, not the
792                    new data, so we have to call the original tdb_read
793                    method to get it */
794                 if (methods->tread(tdb, offset,
795                                    p + sizeof(offset) + sizeof(length),
796                                    length) != 0) {
797                         free(data);
798                         return -1;
799                 }
800                 p += sizeof(offset) + sizeof(length) + length;
801         }
802
803         /* and the tailer */
804         tailer = sizeof(*rec) + recovery_max_size;
805         memcpy(p, &tailer, sizeof(tailer));
806         tdb_convert(tdb, p, sizeof(tailer));
807
808         /* write the recovery data to the recovery area */
809         if (methods->twrite(tdb, recovery_offset, data,
810                             sizeof(*rec) + recovery_size) == -1) {
811                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
812                          "tdb_transaction_setup_recovery:"
813                          " failed to write recovery data");
814                 free(data);
815                 return -1;
816         }
817         transaction_write_existing(tdb, recovery_offset, data,
818                                    sizeof(*rec) + recovery_size);
819
820         /* as we don't have ordered writes, we have to sync the recovery
821            data before we update the magic to indicate that the recovery
822            data is present */
823         if (transaction_sync(tdb, recovery_offset,
824                              sizeof(*rec) + recovery_size) == -1) {
825                 free(data);
826                 return -1;
827         }
828
829         free(data);
830
831         magic = TDB_RECOVERY_MAGIC;
832         tdb_convert(tdb, &magic, sizeof(magic));
833
834         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
835                                                    magic);
836
837         if (methods->twrite(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
838                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
839                          "tdb_transaction_setup_recovery:"
840                          " failed to write recovery magic");
841                 return -1;
842         }
843         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
844
845         /* ensure the recovery magic marker is on disk */
846         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
847                 return -1;
848         }
849
850         return 0;
851 }
852
853 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
854 {
855         const struct tdb_methods *methods;
856
857         if (tdb->transaction == NULL) {
858                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
859                            "tdb_transaction_prepare_commit: no transaction");
860                 return -1;
861         }
862
863         if (tdb->transaction->prepared) {
864                 _tdb_transaction_cancel(tdb);
865                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
866                            "tdb_transaction_prepare_commit:"
867                            " transaction already prepared");
868                 return -1;
869         }
870
871         if (tdb->transaction->transaction_error) {
872                 _tdb_transaction_cancel(tdb);
873                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
874                            "tdb_transaction_prepare_commit:"
875                            " transaction error pending");
876                 return -1;
877         }
878
879
880         if (tdb->transaction->nesting != 0) {
881                 tdb->transaction->nesting--;
882                 return 0;
883         }
884
885         /* check for a null transaction */
886         if (tdb->transaction->blocks == NULL) {
887                 return 0;
888         }
889
890         methods = tdb->transaction->io_methods;
891
892         /* upgrade the main transaction lock region to a write lock */
893         if (tdb_allrecord_upgrade(tdb) == -1) {
894                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
895                          "tdb_transaction_prepare_commit:"
896                          " failed to upgrade hash locks");
897                 _tdb_transaction_cancel(tdb);
898                 return -1;
899         }
900
901         /* get the open lock - this prevents new users attaching to the database
902            during the commit */
903         if (tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK) == -1) {
904                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
905                          "tdb_transaction_prepare_commit:"
906                          " failed to get open lock");
907                 _tdb_transaction_cancel(tdb);
908                 return -1;
909         }
910
911         /* Since we have whole db locked, we don't need the expansion lock. */
912         if (!(tdb->flags & TDB_NOSYNC)) {
913                 /* write the recovery data to the end of the file */
914                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
915                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
916                                  "tdb_transaction_prepare_commit:"
917                                  " failed to setup recovery data");
918                         _tdb_transaction_cancel(tdb);
919                         return -1;
920                 }
921         }
922
923         tdb->transaction->prepared = true;
924
925         /* expand the file to the new size if needed */
926         if (tdb->map_size != tdb->transaction->old_map_size) {
927                 tdb_len_t add = tdb->map_size - tdb->transaction->old_map_size;
928                 /* Restore original map size for tdb_expand_file */
929                 tdb->map_size = tdb->transaction->old_map_size;
930                 if (methods->expand_file(tdb, add) == -1) {
931                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
932                                  "tdb_transaction_prepare_commit:"
933                                  " expansion failed");
934                         _tdb_transaction_cancel(tdb);
935                         return -1;
936                 }
937         }
938
939         /* Keep the open lock until the actual commit */
940
941         return 0;
942 }
943
944 /*
945    prepare to commit the current transaction
946 */
947 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
948 {
949         return _tdb_transaction_prepare_commit(tdb);
950 }
951
952 /*
953   commit the current transaction
954 */
955 int tdb_transaction_commit(struct tdb_context *tdb)
956 {
957         const struct tdb_methods *methods;
958         int i;
959
960         if (tdb->transaction == NULL) {
961                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
962                          "tdb_transaction_commit: no transaction");
963                 return -1;
964         }
965
966         tdb_trace(tdb, "tdb_transaction_commit");
967
968         if (tdb->transaction->transaction_error) {
969                 tdb_transaction_cancel(tdb);
970                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
971                            "tdb_transaction_commit:"
972                            " transaction error pending");
973                 return -1;
974         }
975
976
977         if (tdb->transaction->nesting != 0) {
978                 tdb->transaction->nesting--;
979                 return 0;
980         }
981
982         /* check for a null transaction */
983         if (tdb->transaction->blocks == NULL) {
984                 _tdb_transaction_cancel(tdb);
985                 return 0;
986         }
987
988         if (!tdb->transaction->prepared) {
989                 int ret = _tdb_transaction_prepare_commit(tdb);
990                 if (ret)
991                         return ret;
992         }
993
994         methods = tdb->transaction->io_methods;
995
996         /* perform all the writes */
997         for (i=0;i<tdb->transaction->num_blocks;i++) {
998                 tdb_off_t offset;
999                 tdb_len_t length;
1000
1001                 if (tdb->transaction->blocks[i] == NULL) {
1002                         continue;
1003                 }
1004
1005                 offset = i * getpagesize();
1006                 length = getpagesize();
1007                 if (i == tdb->transaction->num_blocks-1) {
1008                         length = tdb->transaction->last_block_size;
1009                 }
1010
1011                 if (methods->twrite(tdb, offset, tdb->transaction->blocks[i],
1012                                     length) == -1) {
1013                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1014                                    "tdb_transaction_commit:"
1015                                    " write failed during commit");
1016
1017                         /* we've overwritten part of the data and
1018                            possibly expanded the file, so we need to
1019                            run the crash recovery code */
1020                         tdb->methods = methods;
1021                         tdb_transaction_recover(tdb);
1022
1023                         _tdb_transaction_cancel(tdb);
1024
1025                         return -1;
1026                 }
1027                 SAFE_FREE(tdb->transaction->blocks[i]);
1028         }
1029
1030         SAFE_FREE(tdb->transaction->blocks);
1031         tdb->transaction->num_blocks = 0;
1032
1033         /* ensure the new data is on disk */
1034         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1035                 return -1;
1036         }
1037
1038         /*
1039           TODO: maybe write to some dummy hdr field, or write to magic
1040           offset without mmap, before the last sync, instead of the
1041           utime() call
1042         */
1043
1044         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1045            don't change the mtime of the file, this means the file may
1046            not be backed up (as tdb rounding to block sizes means that
1047            file size changes are quite rare too). The following forces
1048            mtime changes when a transaction completes */
1049 #if HAVE_UTIME
1050         utime(tdb->name, NULL);
1051 #endif
1052
1053         /* use a transaction cancel to free memory and remove the
1054            transaction locks */
1055         _tdb_transaction_cancel(tdb);
1056
1057         return 0;
1058 }
1059
1060
1061 /*
1062   recover from an aborted transaction. Must be called with exclusive
1063   database write access already established (including the open
1064   lock to prevent new processes attaching)
1065 */
1066 int tdb_transaction_recover(struct tdb_context *tdb)
1067 {
1068         tdb_off_t recovery_head, recovery_eof;
1069         unsigned char *data, *p;
1070         struct tdb_recovery_record rec;
1071
1072         /* find the recovery area */
1073         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1074         if (recovery_head == TDB_OFF_ERR) {
1075                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1076                          "tdb_transaction_recover:"
1077                          " failed to read recovery head");
1078                 return -1;
1079         }
1080
1081         if (recovery_head == 0) {
1082                 /* we have never allocated a recovery record */
1083                 return 0;
1084         }
1085
1086         /* read the recovery record */
1087         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1088                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1089                            "tdb_transaction_recover:"
1090                            " failed to read recovery record");
1091                 return -1;
1092         }
1093
1094         if (rec.magic != TDB_RECOVERY_MAGIC) {
1095                 /* there is no valid recovery data */
1096                 return 0;
1097         }
1098
1099         if (tdb->read_only) {
1100                 tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1101                            "tdb_transaction_recover:"
1102                            " attempt to recover read only database");
1103                 return -1;
1104         }
1105
1106         recovery_eof = rec.eof;
1107
1108         data = (unsigned char *)malloc(rec.len);
1109         if (data == NULL) {
1110                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1111                            "tdb_transaction_recover:"
1112                            " failed to allocate recovery data");
1113                 return -1;
1114         }
1115
1116         /* read the full recovery data */
1117         if (tdb->methods->tread(tdb, recovery_head + sizeof(rec), data,
1118                                 rec.len) == -1) {
1119                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1120                            "tdb_transaction_recover:"
1121                            " failed to read recovery data");
1122                 return -1;
1123         }
1124
1125         /* recover the file data */
1126         p = data;
1127         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1128                 tdb_off_t ofs;
1129                 tdb_len_t len;
1130                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1131                 memcpy(&ofs, p, sizeof(ofs));
1132                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1133                 p += sizeof(ofs) + sizeof(len);
1134
1135                 if (tdb->methods->twrite(tdb, ofs, p, len) == -1) {
1136                         free(data);
1137                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1138                                  "tdb_transaction_recover:"
1139                                  " failed to recover %zu bytes at offset %zu",
1140                                  (size_t)len, (size_t)ofs);
1141                         return -1;
1142                 }
1143                 p += len;
1144         }
1145
1146         free(data);
1147
1148         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1149                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1150                            "tdb_transaction_recover: failed to sync recovery");
1151                 return -1;
1152         }
1153
1154         /* if the recovery area is after the recovered eof then remove it */
1155         if (recovery_eof <= recovery_head) {
1156                 if (tdb_write_off(tdb, offsetof(struct tdb_header,recovery), 0)
1157                     == -1) {
1158                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1159                                  "tdb_transaction_recover:"
1160                                  " failed to remove recovery head");
1161                         return -1;
1162                 }
1163         }
1164
1165         /* remove the recovery magic */
1166         if (tdb_write_off(tdb,
1167                           recovery_head
1168                           + offsetof(struct tdb_recovery_record, magic),
1169                           TDB_RECOVERY_INVALID_MAGIC) == -1) {
1170                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1171                          "tdb_transaction_recover:"
1172                          " failed to remove recovery magic");
1173                 return -1;
1174         }
1175
1176         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1177                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1178                          "tdb_transaction_recover: failed to sync2 recovery");
1179                 return -1;
1180         }
1181
1182         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1183                    "tdb_transaction_recover: recovered %zu byte database",
1184                    (size_t)recovery_eof);
1185
1186         /* all done */
1187         return 0;
1188 }
1189
1190 /* Any I/O failures we say "needs recovery". */
1191 bool tdb_needs_recovery(struct tdb_context *tdb)
1192 {
1193         tdb_off_t recovery_head;
1194         struct tdb_recovery_record rec;
1195
1196         /* find the recovery area */
1197         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1198         if (recovery_head == TDB_OFF_ERR) {
1199                 return true;
1200         }
1201
1202         if (recovery_head == 0) {
1203                 /* we have never allocated a recovery record */
1204                 return false;
1205         }
1206
1207         /* read the recovery record */
1208         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1209                 return true;
1210         }
1211
1212         return (rec.magic == TDB_RECOVERY_MAGIC);
1213 }