]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
6b2954dbe64de152b6c9ae28a4fdd2688ead18b9
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in posix locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is cancelled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or cancelled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129                             tdb_len_t len)
130 {
131         size_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % getpagesize()) > getpagesize()) {
135                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
136                 if (transaction_read(tdb, off, buf, len2) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / getpagesize();
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->tread(tdb, off, buf, len)
155                     != 0) {
156                         goto fail;
157                 }
158                 return 0;
159         }
160
161         /* it is in the block list. Now check for the last block */
162         if (blk == tdb->transaction->num_blocks-1) {
163                 if (len > tdb->transaction->last_block_size) {
164                         goto fail;
165                 }
166         }
167
168         /* now copy it out of this block */
169         memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len);
170         return 0;
171
172 fail:
173         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
174                    "transaction_read: failed at off=%zu len=%zu",
175                    (size_t)off, (size_t)len);
176         tdb->transaction->transaction_error = 1;
177         return -1;
178 }
179
180
181 /*
182   write while in a transaction
183 */
184 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
185                              const void *buf, tdb_len_t len)
186 {
187         size_t blk;
188
189         /* Only a commit is allowed on a prepared transaction */
190         if (tdb->transaction->prepared) {
191                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
192                          "transaction_write: transaction already prepared,"
193                          " write not allowed");
194                 goto fail;
195         }
196
197         /* break it up into block sized chunks */
198         while (len + (off % getpagesize()) > getpagesize()) {
199                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
200                 if (transaction_write(tdb, off, buf, len2) != 0) {
201                         return -1;
202                 }
203                 len -= len2;
204                 off += len2;
205                 if (buf != NULL) {
206                         buf = (const void *)(len2 + (const char *)buf);
207                 }
208         }
209
210         if (len == 0) {
211                 return 0;
212         }
213
214         blk = off / getpagesize();
215         off = off % getpagesize();
216
217         if (tdb->transaction->num_blocks <= blk) {
218                 uint8_t **new_blocks;
219                 /* expand the blocks array */
220                 if (tdb->transaction->blocks == NULL) {
221                         new_blocks = (uint8_t **)malloc(
222                                 (blk+1)*sizeof(uint8_t *));
223                 } else {
224                         new_blocks = (uint8_t **)realloc(
225                                 tdb->transaction->blocks,
226                                 (blk+1)*sizeof(uint8_t *));
227                 }
228                 if (new_blocks == NULL) {
229                         tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
230                                    "transaction_write: failed to allocate");
231                         goto fail;
232                 }
233                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
234                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
235                 tdb->transaction->blocks = new_blocks;
236                 tdb->transaction->num_blocks = blk+1;
237                 tdb->transaction->last_block_size = 0;
238         }
239
240         /* allocate and fill a block? */
241         if (tdb->transaction->blocks[blk] == NULL) {
242                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1);
243                 if (tdb->transaction->blocks[blk] == NULL) {
244                         tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
245                                    "transaction_write: failed to allocate");
246                         goto fail;
247                 }
248                 if (tdb->transaction->old_map_size > blk * getpagesize()) {
249                         tdb_len_t len2 = getpagesize();
250                         if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) {
251                                 len2 = tdb->transaction->old_map_size - (blk * getpagesize());
252                         }
253                         if (tdb->transaction->io_methods->tread(tdb, blk * getpagesize(),
254                                                                 tdb->transaction->blocks[blk],
255                                                                 len2) != 0) {
256                                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
257                                            "transaction_write: failed to"
258                                            " read old block: %s",
259                                            strerror(errno));
260                                 SAFE_FREE(tdb->transaction->blocks[blk]);
261                                 goto fail;
262                         }
263                         if (blk == tdb->transaction->num_blocks-1) {
264                                 tdb->transaction->last_block_size = len2;
265                         }
266                 }
267         }
268
269         /* overwrite part of an existing block */
270         if (buf == NULL) {
271                 memset(tdb->transaction->blocks[blk] + off, 0, len);
272         } else {
273                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
274         }
275         if (blk == tdb->transaction->num_blocks-1) {
276                 if (len + off > tdb->transaction->last_block_size) {
277                         tdb->transaction->last_block_size = len + off;
278                 }
279         }
280
281         return 0;
282
283 fail:
284         tdb->transaction->transaction_error = 1;
285         return -1;
286 }
287
288
289 /*
290   write while in a transaction - this varient never expands the transaction blocks, it only
291   updates existing blocks. This means it cannot change the recovery size
292 */
293 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
294                                        const void *buf, tdb_len_t len)
295 {
296         size_t blk;
297
298         /* break it up into block sized chunks */
299         while (len + (off % getpagesize()) > getpagesize()) {
300                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
301                 transaction_write_existing(tdb, off, buf, len2);
302                 len -= len2;
303                 off += len2;
304                 if (buf != NULL) {
305                         buf = (const void *)(len2 + (const char *)buf);
306                 }
307         }
308
309         if (len == 0) {
310                 return;
311         }
312
313         blk = off / getpagesize();
314         off = off % getpagesize();
315
316         if (tdb->transaction->num_blocks <= blk ||
317             tdb->transaction->blocks[blk] == NULL) {
318                 return;
319         }
320
321         if (blk == tdb->transaction->num_blocks-1 &&
322             off + len > tdb->transaction->last_block_size) {
323                 if (off >= tdb->transaction->last_block_size) {
324                         return;
325                 }
326                 len = tdb->transaction->last_block_size - off;
327         }
328
329         /* overwrite part of an existing block */
330         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
331 }
332
333
334 /*
335   out of bounds check during a transaction
336 */
337 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
338 {
339         if (len <= tdb->map_size) {
340                 return 0;
341         }
342         tdb->ecode = TDB_ERR_IO;
343         if (!probe) {
344                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
345                            "tdb_oob len %lld beyond transaction size %lld",
346                            (long long)len,
347                            (long long)tdb->map_size);
348         }
349         return -1;
350 }
351
352 /*
353   transaction version of tdb_expand().
354 */
355 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t addition)
356 {
357         /* add a write to the transaction elements, so subsequent
358            reads see the zero data */
359         if (transaction_write(tdb, tdb->map_size, NULL, addition) != 0) {
360                 return -1;
361         }
362         tdb->map_size += addition;
363         return 0;
364 }
365
366 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
367                                 size_t len, bool write_mode)
368 {
369         size_t blk = off / getpagesize(), end_blk;
370
371         /* This is wrong for zero-length blocks, but will fail gracefully */
372         end_blk = (off + len - 1) / getpagesize();
373
374         /* Can only do direct if in single block and we've already copied. */
375         if (write_mode) {
376                 if (blk != end_blk)
377                         return NULL;
378                 if (blk >= tdb->transaction->num_blocks)
379                         return NULL;
380                 if (tdb->transaction->blocks[blk] == NULL)
381                         return NULL;
382                 return tdb->transaction->blocks[blk] + off % getpagesize();
383         }
384
385         /* Single which we have copied? */
386         if (blk == end_blk
387             && blk < tdb->transaction->num_blocks
388             && tdb->transaction->blocks[blk])
389                 return tdb->transaction->blocks[blk] + off % getpagesize();
390
391         /* Otherwise must be all not copied. */
392         while (blk < end_blk) {
393                 if (blk >= tdb->transaction->num_blocks)
394                         break;
395                 if (tdb->transaction->blocks[blk])
396                         return NULL;
397                 blk++;
398         }
399         return tdb->transaction->io_methods->direct(tdb, off, len, false);
400 }
401
402 static const struct tdb_methods transaction_methods = {
403         transaction_read,
404         transaction_write,
405         transaction_oob,
406         transaction_expand_file,
407         transaction_direct,
408 };
409
410 /*
411   sync to disk
412 */
413 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
414 {
415         if (tdb->flags & TDB_NOSYNC) {
416                 return 0;
417         }
418
419         if (fsync(tdb->fd) != 0) {
420                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
421                            "tdb_transaction: fsync failed: %s",
422                            strerror(errno));
423                 return -1;
424         }
425 #ifdef MS_SYNC
426         if (tdb->map_ptr) {
427                 tdb_off_t moffset = offset & ~(getpagesize()-1);
428                 if (msync(moffset + (char *)tdb->map_ptr,
429                           length + (offset - moffset), MS_SYNC) != 0) {
430                         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
431                                    "tdb_transaction: msync failed: %s",
432                                    strerror(errno));
433                         return -1;
434                 }
435         }
436 #endif
437         return 0;
438 }
439
440
441 static void _tdb_transaction_cancel(struct tdb_context *tdb)
442 {
443         int i;
444
445         if (tdb->transaction == NULL) {
446                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
447                            "tdb_transaction_cancel: no transaction");
448                 return;
449         }
450
451         if (tdb->transaction->nesting != 0) {
452                 tdb->transaction->transaction_error = 1;
453                 tdb->transaction->nesting--;
454                 return;
455         }
456
457         tdb->map_size = tdb->transaction->old_map_size;
458
459         /* free all the transaction blocks */
460         for (i=0;i<tdb->transaction->num_blocks;i++) {
461                 if (tdb->transaction->blocks[i] != NULL) {
462                         free(tdb->transaction->blocks[i]);
463                 }
464         }
465         SAFE_FREE(tdb->transaction->blocks);
466
467         if (tdb->transaction->magic_offset) {
468                 const struct tdb_methods *methods = tdb->transaction->io_methods;
469                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
470
471                 /* remove the recovery marker */
472                 if (methods->twrite(tdb, tdb->transaction->magic_offset,
473                                     &invalid, sizeof(invalid)) == -1 ||
474                     transaction_sync(tdb, tdb->transaction->magic_offset,
475                                      sizeof(invalid)) == -1) {
476                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
477                                    "tdb_transaction_cancel: failed to remove"
478                                    " recovery magic");
479                 }
480         }
481
482         if (tdb->allrecord_lock.count)
483                 tdb_allrecord_unlock(tdb, tdb->allrecord_lock.ltype);
484
485         /* restore the normal io methods */
486         tdb->methods = tdb->transaction->io_methods;
487
488         tdb_transaction_unlock(tdb, F_WRLCK);
489
490         if (tdb_has_open_lock(tdb))
491                 tdb_unlock_open(tdb);
492
493         SAFE_FREE(tdb->transaction);
494 }
495
496 /*
497   start a tdb transaction. No token is returned, as only a single
498   transaction is allowed to be pending per tdb_context
499 */
500 int tdb_transaction_start(struct tdb_context *tdb)
501 {
502         /* some sanity checks */
503         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
504                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
505                            "tdb_transaction_start: cannot start a transaction"
506                            " on a read-only or internal db");
507                 return -1;
508         }
509
510         /* cope with nested tdb_transaction_start() calls */
511         if (tdb->transaction != NULL) {
512                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_USE_ERROR,
513                            "tdb_transaction_start:"
514                            " already inside transaction");
515                 return -1;
516         }
517
518         if (tdb_has_hash_locks(tdb)) {
519                 /* the caller must not have any locks when starting a
520                    transaction as otherwise we'll be screwed by lack
521                    of nested locks in posix */
522                 tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
523                            "tdb_transaction_start: cannot start a transaction"
524                            " with locks held");
525                 return -1;
526         }
527
528         tdb->transaction = (struct tdb_transaction *)
529                 calloc(sizeof(struct tdb_transaction), 1);
530         if (tdb->transaction == NULL) {
531                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
532                            "tdb_transaction_start: cannot allocate");
533                 return -1;
534         }
535
536         /* get the transaction write lock. This is a blocking lock. As
537            discussed with Volker, there are a number of ways we could
538            make this async, which we will probably do in the future */
539         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
540                 SAFE_FREE(tdb->transaction->blocks);
541                 SAFE_FREE(tdb->transaction);
542                 return -1;
543         }
544
545         /* get a read lock over entire file. This is upgraded to a write
546            lock during the commit */
547         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
548                 goto fail_allrecord_lock;
549         }
550
551         /* make sure we know about any file expansions already done by
552            anyone else */
553         tdb->methods->oob(tdb, tdb->map_size + 1, true);
554         tdb->transaction->old_map_size = tdb->map_size;
555
556         /* finally hook the io methods, replacing them with
557            transaction specific methods */
558         tdb->transaction->io_methods = tdb->methods;
559         tdb->methods = &transaction_methods;
560         return 0;
561
562 fail_allrecord_lock:
563         tdb_transaction_unlock(tdb, F_WRLCK);
564         SAFE_FREE(tdb->transaction->blocks);
565         SAFE_FREE(tdb->transaction);
566         return -1;
567 }
568
569
570 /*
571   cancel the current transaction
572 */
573 void tdb_transaction_cancel(struct tdb_context *tdb)
574 {
575         _tdb_transaction_cancel(tdb);
576 }
577
578 /*
579   work out how much space the linearised recovery data will consume
580 */
581 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
582 {
583         tdb_len_t recovery_size = 0;
584         int i;
585
586         recovery_size = sizeof(tdb_len_t);
587         for (i=0;i<tdb->transaction->num_blocks;i++) {
588                 if (i * getpagesize() >= tdb->transaction->old_map_size) {
589                         break;
590                 }
591                 if (tdb->transaction->blocks[i] == NULL) {
592                         continue;
593                 }
594                 recovery_size += 2*sizeof(tdb_off_t);
595                 if (i == tdb->transaction->num_blocks-1) {
596                         recovery_size += tdb->transaction->last_block_size;
597                 } else {
598                         recovery_size += getpagesize();
599                 }
600         }
601
602         return recovery_size;
603 }
604
605 /*
606   allocate the recovery area, or use an existing recovery area if it is
607   large enough
608 */
609 static int tdb_recovery_allocate(struct tdb_context *tdb,
610                                  tdb_len_t *recovery_size,
611                                  tdb_off_t *recovery_offset,
612                                  tdb_len_t *recovery_max_size)
613 {
614         struct tdb_recovery_record rec;
615         const struct tdb_methods *methods = tdb->transaction->io_methods;
616         tdb_off_t recovery_head;
617         size_t addition;
618
619         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
620         if (recovery_head == TDB_OFF_ERR) {
621                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
622                          "tdb_recovery_allocate:"
623                          " failed to read recovery head");
624                 return -1;
625         }
626
627         if (recovery_head != 0) {
628                 if (methods->tread(tdb, recovery_head, &rec, sizeof(rec))) {
629                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
630                                  "tdb_recovery_allocate:"
631                                  " failed to read recovery record");
632                         return -1;
633                 }
634                 tdb_convert(tdb, &rec, sizeof(rec));
635                 /* ignore invalid recovery regions: can happen in crash */
636                 if (rec.magic != TDB_RECOVERY_MAGIC &&
637                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
638                         recovery_head = 0;
639                 }
640         }
641
642         *recovery_size = tdb_recovery_size(tdb);
643
644         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
645                 /* it fits in the existing area */
646                 *recovery_max_size = rec.max_len;
647                 *recovery_offset = recovery_head;
648                 return 0;
649         }
650
651         /* we need to free up the old recovery area, then allocate a
652            new one at the end of the file. Note that we cannot use
653            normal allocation to allocate the new one as that might return
654            us an area that is being currently used (as of the start of
655            the transaction) */
656         if (recovery_head != 0) {
657                 add_stat(tdb, frees, 1);
658                 if (add_free_record(tdb, recovery_head,
659                                     sizeof(rec) + rec.max_len) != 0) {
660                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
661                                    "tdb_recovery_allocate:"
662                                    " failed to free previous recovery area");
663                         return -1;
664                 }
665         }
666
667         /* the tdb_free() call might have increased the recovery size */
668         *recovery_size = tdb_recovery_size(tdb);
669
670         /* round up to a multiple of page size */
671         *recovery_max_size
672                 = (((sizeof(rec) + *recovery_size) + getpagesize()-1)
673                    & ~(getpagesize()-1))
674                 - sizeof(rec);
675         *recovery_offset = tdb->map_size;
676         recovery_head = *recovery_offset;
677
678         /* Restore ->map_size before calling underlying expand_file.
679            Also so that we don't try to expand the file again in the
680            transaction commit, which would destroy the recovery
681            area */
682         addition = (tdb->map_size - tdb->transaction->old_map_size) +
683                 sizeof(rec) + *recovery_max_size;
684         tdb->map_size = tdb->transaction->old_map_size;
685         if (methods->expand_file(tdb, addition) == -1) {
686                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
687                          "tdb_recovery_allocate:"
688                          " failed to create recovery area");
689                 return -1;
690         }
691
692         /* we have to reset the old map size so that we don't try to
693            expand the file again in the transaction commit, which
694            would destroy the recovery area */
695         tdb->transaction->old_map_size = tdb->map_size;
696
697         /* write the recovery header offset and sync - we can sync without a race here
698            as the magic ptr in the recovery record has not been set */
699         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
700         if (methods->twrite(tdb, offsetof(struct tdb_header, recovery),
701                             &recovery_head, sizeof(tdb_off_t)) == -1) {
702                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
703                          "tdb_recovery_allocate:"
704                          " failed to write recovery head");
705                 return -1;
706         }
707         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
708                                    &recovery_head,
709                                    sizeof(tdb_off_t));
710         return 0;
711 }
712
713 /* Set up header for the recovery record. */
714 static void set_recovery_header(struct tdb_recovery_record *rec,
715                                 uint64_t magic,
716                                 uint64_t datalen, uint64_t actuallen,
717                                 uint64_t oldsize)
718 {
719         rec->magic = magic;
720         rec->max_len = actuallen;
721         rec->len = datalen;
722         rec->eof = oldsize;
723 }
724
725 /*
726   setup the recovery data that will be used on a crash during commit
727 */
728 static int transaction_setup_recovery(struct tdb_context *tdb,
729                                       tdb_off_t *magic_offset)
730 {
731         tdb_len_t recovery_size;
732         unsigned char *data, *p;
733         const struct tdb_methods *methods = tdb->transaction->io_methods;
734         struct tdb_recovery_record *rec;
735         tdb_off_t recovery_offset, recovery_max_size;
736         tdb_off_t old_map_size = tdb->transaction->old_map_size;
737         uint64_t magic, tailer;
738         int i;
739
740         /*
741           check that the recovery area has enough space
742         */
743         if (tdb_recovery_allocate(tdb, &recovery_size,
744                                   &recovery_offset, &recovery_max_size) == -1) {
745                 return -1;
746         }
747
748         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
749         if (data == NULL) {
750                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
751                            "transaction_setup_recovery: cannot allocate");
752                 return -1;
753         }
754
755         rec = (struct tdb_recovery_record *)data;
756         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
757                             recovery_size, recovery_max_size, old_map_size);
758         tdb_convert(tdb, rec, sizeof(*rec));
759
760         /* build the recovery data into a single blob to allow us to do a single
761            large write, which should be more efficient */
762         p = data + sizeof(*rec);
763         for (i=0;i<tdb->transaction->num_blocks;i++) {
764                 tdb_off_t offset;
765                 tdb_len_t length;
766
767                 if (tdb->transaction->blocks[i] == NULL) {
768                         continue;
769                 }
770
771                 offset = i * getpagesize();
772                 length = getpagesize();
773                 if (i == tdb->transaction->num_blocks-1) {
774                         length = tdb->transaction->last_block_size;
775                 }
776
777                 if (offset >= old_map_size) {
778                         continue;
779                 }
780                 if (offset + length > tdb->map_size) {
781                         tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
782                                    "tdb_transaction_setup_recovery:"
783                                    " transaction data over new region boundary");
784                         free(data);
785                         return -1;
786                 }
787                 memcpy(p, &offset, sizeof(offset));
788                 memcpy(p + sizeof(offset), &length, sizeof(length));
789                 tdb_convert(tdb, p, sizeof(offset) + sizeof(length));
790
791                 /* the recovery area contains the old data, not the
792                    new data, so we have to call the original tdb_read
793                    method to get it */
794                 if (methods->tread(tdb, offset,
795                                    p + sizeof(offset) + sizeof(length),
796                                    length) != 0) {
797                         free(data);
798                         return -1;
799                 }
800                 p += sizeof(offset) + sizeof(length) + length;
801         }
802
803         /* and the tailer */
804         tailer = sizeof(*rec) + recovery_max_size;
805         memcpy(p, &tailer, sizeof(tailer));
806         tdb_convert(tdb, p, sizeof(tailer));
807
808         /* write the recovery data to the recovery area */
809         if (methods->twrite(tdb, recovery_offset, data,
810                             sizeof(*rec) + recovery_size) == -1) {
811                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
812                          "tdb_transaction_setup_recovery:"
813                          " failed to write recovery data");
814                 free(data);
815                 return -1;
816         }
817         transaction_write_existing(tdb, recovery_offset, data,
818                                    sizeof(*rec) + recovery_size);
819
820         /* as we don't have ordered writes, we have to sync the recovery
821            data before we update the magic to indicate that the recovery
822            data is present */
823         if (transaction_sync(tdb, recovery_offset,
824                              sizeof(*rec) + recovery_size) == -1) {
825                 free(data);
826                 return -1;
827         }
828
829         free(data);
830
831         magic = TDB_RECOVERY_MAGIC;
832         tdb_convert(tdb, &magic, sizeof(magic));
833
834         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
835                                                    magic);
836
837         if (methods->twrite(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
838                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
839                          "tdb_transaction_setup_recovery:"
840                          " failed to write recovery magic");
841                 return -1;
842         }
843         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
844
845         /* ensure the recovery magic marker is on disk */
846         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
847                 return -1;
848         }
849
850         return 0;
851 }
852
853 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
854 {
855         const struct tdb_methods *methods;
856
857         if (tdb->transaction == NULL) {
858                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
859                            "tdb_transaction_prepare_commit: no transaction");
860                 return -1;
861         }
862
863         if (tdb->transaction->prepared) {
864                 _tdb_transaction_cancel(tdb);
865                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
866                            "tdb_transaction_prepare_commit:"
867                            " transaction already prepared");
868                 return -1;
869         }
870
871         if (tdb->transaction->transaction_error) {
872                 _tdb_transaction_cancel(tdb);
873                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
874                            "tdb_transaction_prepare_commit:"
875                            " transaction error pending");
876                 return -1;
877         }
878
879
880         if (tdb->transaction->nesting != 0) {
881                 tdb->transaction->nesting--;
882                 return 0;
883         }
884
885         /* check for a null transaction */
886         if (tdb->transaction->blocks == NULL) {
887                 return 0;
888         }
889
890         methods = tdb->transaction->io_methods;
891
892         /* upgrade the main transaction lock region to a write lock */
893         if (tdb_allrecord_upgrade(tdb) == -1) {
894                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
895                          "tdb_transaction_prepare_commit:"
896                          " failed to upgrade hash locks");
897                 _tdb_transaction_cancel(tdb);
898                 return -1;
899         }
900
901         /* get the open lock - this prevents new users attaching to the database
902            during the commit */
903         if (tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK) == -1) {
904                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
905                          "tdb_transaction_prepare_commit:"
906                          " failed to get open lock");
907                 _tdb_transaction_cancel(tdb);
908                 return -1;
909         }
910
911         /* Since we have whole db locked, we don't need the expansion lock. */
912         if (!(tdb->flags & TDB_NOSYNC)) {
913                 /* write the recovery data to the end of the file */
914                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
915                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
916                                  "tdb_transaction_prepare_commit:"
917                                  " failed to setup recovery data");
918                         _tdb_transaction_cancel(tdb);
919                         return -1;
920                 }
921         }
922
923         tdb->transaction->prepared = true;
924
925         /* expand the file to the new size if needed */
926         if (tdb->map_size != tdb->transaction->old_map_size) {
927                 tdb_len_t add = tdb->map_size - tdb->transaction->old_map_size;
928                 /* Restore original map size for tdb_expand_file */
929                 tdb->map_size = tdb->transaction->old_map_size;
930                 if (methods->expand_file(tdb, add) == -1) {
931                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
932                                  "tdb_transaction_prepare_commit:"
933                                  " expansion failed");
934                         _tdb_transaction_cancel(tdb);
935                         return -1;
936                 }
937         }
938
939         /* Keep the open lock until the actual commit */
940
941         return 0;
942 }
943
944 /*
945    prepare to commit the current transaction
946 */
947 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
948 {
949         return _tdb_transaction_prepare_commit(tdb);
950 }
951
952 /*
953   commit the current transaction
954 */
955 int tdb_transaction_commit(struct tdb_context *tdb)
956 {
957         const struct tdb_methods *methods;
958         int i;
959
960         if (tdb->transaction == NULL) {
961                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
962                          "tdb_transaction_commit: no transaction");
963                 return -1;
964         }
965
966         tdb_trace(tdb, "tdb_transaction_commit");
967
968         if (tdb->transaction->nesting != 0) {
969                 tdb->transaction->nesting--;
970                 return 0;
971         }
972
973         /* check for a null transaction */
974         if (tdb->transaction->blocks == NULL) {
975                 _tdb_transaction_cancel(tdb);
976                 return 0;
977         }
978
979         if (!tdb->transaction->prepared) {
980                 int ret = _tdb_transaction_prepare_commit(tdb);
981                 if (ret)
982                         return ret;
983         }
984
985         methods = tdb->transaction->io_methods;
986
987         /* perform all the writes */
988         for (i=0;i<tdb->transaction->num_blocks;i++) {
989                 tdb_off_t offset;
990                 tdb_len_t length;
991
992                 if (tdb->transaction->blocks[i] == NULL) {
993                         continue;
994                 }
995
996                 offset = i * getpagesize();
997                 length = getpagesize();
998                 if (i == tdb->transaction->num_blocks-1) {
999                         length = tdb->transaction->last_block_size;
1000                 }
1001
1002                 if (methods->twrite(tdb, offset, tdb->transaction->blocks[i],
1003                                     length) == -1) {
1004                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1005                                    "tdb_transaction_commit:"
1006                                    " write failed during commit");
1007
1008                         /* we've overwritten part of the data and
1009                            possibly expanded the file, so we need to
1010                            run the crash recovery code */
1011                         tdb->methods = methods;
1012                         tdb_transaction_recover(tdb);
1013
1014                         _tdb_transaction_cancel(tdb);
1015
1016                         return -1;
1017                 }
1018                 SAFE_FREE(tdb->transaction->blocks[i]);
1019         }
1020
1021         SAFE_FREE(tdb->transaction->blocks);
1022         tdb->transaction->num_blocks = 0;
1023
1024         /* ensure the new data is on disk */
1025         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1026                 return -1;
1027         }
1028
1029         /*
1030           TODO: maybe write to some dummy hdr field, or write to magic
1031           offset without mmap, before the last sync, instead of the
1032           utime() call
1033         */
1034
1035         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1036            don't change the mtime of the file, this means the file may
1037            not be backed up (as tdb rounding to block sizes means that
1038            file size changes are quite rare too). The following forces
1039            mtime changes when a transaction completes */
1040 #if HAVE_UTIME
1041         utime(tdb->name, NULL);
1042 #endif
1043
1044         /* use a transaction cancel to free memory and remove the
1045            transaction locks */
1046         _tdb_transaction_cancel(tdb);
1047
1048         return 0;
1049 }
1050
1051
1052 /*
1053   recover from an aborted transaction. Must be called with exclusive
1054   database write access already established (including the open
1055   lock to prevent new processes attaching)
1056 */
1057 int tdb_transaction_recover(struct tdb_context *tdb)
1058 {
1059         tdb_off_t recovery_head, recovery_eof;
1060         unsigned char *data, *p;
1061         struct tdb_recovery_record rec;
1062
1063         /* find the recovery area */
1064         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1065         if (recovery_head == TDB_OFF_ERR) {
1066                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1067                          "tdb_transaction_recover:"
1068                          " failed to read recovery head");
1069                 return -1;
1070         }
1071
1072         if (recovery_head == 0) {
1073                 /* we have never allocated a recovery record */
1074                 return 0;
1075         }
1076
1077         /* read the recovery record */
1078         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1079                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1080                            "tdb_transaction_recover:"
1081                            " failed to read recovery record");
1082                 return -1;
1083         }
1084
1085         if (rec.magic != TDB_RECOVERY_MAGIC) {
1086                 /* there is no valid recovery data */
1087                 return 0;
1088         }
1089
1090         if (tdb->read_only) {
1091                 tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1092                            "tdb_transaction_recover:"
1093                            " attempt to recover read only database");
1094                 return -1;
1095         }
1096
1097         recovery_eof = rec.eof;
1098
1099         data = (unsigned char *)malloc(rec.len);
1100         if (data == NULL) {
1101                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1102                            "tdb_transaction_recover:"
1103                            " failed to allocate recovery data");
1104                 return -1;
1105         }
1106
1107         /* read the full recovery data */
1108         if (tdb->methods->tread(tdb, recovery_head + sizeof(rec), data,
1109                                 rec.len) == -1) {
1110                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1111                            "tdb_transaction_recover:"
1112                            " failed to read recovery data");
1113                 return -1;
1114         }
1115
1116         /* recover the file data */
1117         p = data;
1118         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1119                 tdb_off_t ofs;
1120                 tdb_len_t len;
1121                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1122                 memcpy(&ofs, p, sizeof(ofs));
1123                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1124                 p += sizeof(ofs) + sizeof(len);
1125
1126                 if (tdb->methods->twrite(tdb, ofs, p, len) == -1) {
1127                         free(data);
1128                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1129                                  "tdb_transaction_recover:"
1130                                  " failed to recover %zu bytes at offset %zu",
1131                                  (size_t)len, (size_t)ofs);
1132                         return -1;
1133                 }
1134                 p += len;
1135         }
1136
1137         free(data);
1138
1139         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1140                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1141                            "tdb_transaction_recover: failed to sync recovery");
1142                 return -1;
1143         }
1144
1145         /* if the recovery area is after the recovered eof then remove it */
1146         if (recovery_eof <= recovery_head) {
1147                 if (tdb_write_off(tdb, offsetof(struct tdb_header,recovery), 0)
1148                     == -1) {
1149                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1150                                  "tdb_transaction_recover:"
1151                                  " failed to remove recovery head");
1152                         return -1;
1153                 }
1154         }
1155
1156         /* remove the recovery magic */
1157         if (tdb_write_off(tdb,
1158                           recovery_head
1159                           + offsetof(struct tdb_recovery_record, magic),
1160                           TDB_RECOVERY_INVALID_MAGIC) == -1) {
1161                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1162                          "tdb_transaction_recover:"
1163                          " failed to remove recovery magic");
1164                 return -1;
1165         }
1166
1167         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1168                 tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
1169                          "tdb_transaction_recover: failed to sync2 recovery");
1170                 return -1;
1171         }
1172
1173         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1174                    "tdb_transaction_recover: recovered %zu byte database",
1175                    (size_t)recovery_eof);
1176
1177         /* all done */
1178         return 0;
1179 }
1180
1181 /* Any I/O failures we say "needs recovery". */
1182 bool tdb_needs_recovery(struct tdb_context *tdb)
1183 {
1184         tdb_off_t recovery_head;
1185         struct tdb_recovery_record rec;
1186
1187         /* find the recovery area */
1188         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1189         if (recovery_head == TDB_OFF_ERR) {
1190                 return true;
1191         }
1192
1193         if (recovery_head == 0) {
1194                 /* we have never allocated a recovery record */
1195                 return false;
1196         }
1197
1198         /* read the recovery record */
1199         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1200                 return true;
1201         }
1202
1203         return (rec.magic == TDB_RECOVERY_MAGIC);
1204 }