]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
29192beece4e3e48f0a6a337c677af2e14c0de63
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in POSIX locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is canceled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or canceled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
124 #define PAGESIZE 4096
125
126 /*
127   read while in a transaction. We need to check first if the data is in our list
128   of transaction elements, then if not do a real read
129 */
130 static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off,
131                                        void *buf, tdb_len_t len)
132 {
133         size_t blk;
134         enum TDB_ERROR ecode;
135
136         /* break it down into block sized ops */
137         while (len + (off % PAGESIZE) > PAGESIZE) {
138                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
139                 ecode = transaction_read(tdb, off, buf, len2);
140                 if (ecode != TDB_SUCCESS) {
141                         return ecode;
142                 }
143                 len -= len2;
144                 off += len2;
145                 buf = (void *)(len2 + (char *)buf);
146         }
147
148         if (len == 0) {
149                 return TDB_SUCCESS;
150         }
151
152         blk = off / PAGESIZE;
153
154         /* see if we have it in the block list */
155         if (tdb->transaction->num_blocks <= blk ||
156             tdb->transaction->blocks[blk] == NULL) {
157                 /* nope, do a real read */
158                 ecode = tdb->transaction->io_methods->tread(tdb, off, buf, len);
159                 if (ecode != TDB_SUCCESS) {
160                         goto fail;
161                 }
162                 return 0;
163         }
164
165         /* it is in the block list. Now check for the last block */
166         if (blk == tdb->transaction->num_blocks-1) {
167                 if (len > tdb->transaction->last_block_size) {
168                         ecode = TDB_ERR_IO;
169                         goto fail;
170                 }
171         }
172
173         /* now copy it out of this block */
174         memcpy(buf, tdb->transaction->blocks[blk] + (off % PAGESIZE), len);
175         return TDB_SUCCESS;
176
177 fail:
178         tdb->transaction->transaction_error = 1;
179         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
180                           "transaction_read: failed at off=%zu len=%zu",
181                           (size_t)off, (size_t)len);
182 }
183
184
185 /*
186   write while in a transaction
187 */
188 static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off,
189                                         const void *buf, tdb_len_t len)
190 {
191         size_t blk;
192         enum TDB_ERROR ecode;
193
194         /* Only a commit is allowed on a prepared transaction */
195         if (tdb->transaction->prepared) {
196                 ecode = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
197                                    "transaction_write: transaction already"
198                                    " prepared, write not allowed");
199                 goto fail;
200         }
201
202         /* break it up into block sized chunks */
203         while (len + (off % PAGESIZE) > PAGESIZE) {
204                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
205                 ecode = transaction_write(tdb, off, buf, len2);
206                 if (ecode != TDB_SUCCESS) {
207                         return -1;
208                 }
209                 len -= len2;
210                 off += len2;
211                 if (buf != NULL) {
212                         buf = (const void *)(len2 + (const char *)buf);
213                 }
214         }
215
216         if (len == 0) {
217                 return TDB_SUCCESS;
218         }
219
220         blk = off / PAGESIZE;
221         off = off % PAGESIZE;
222
223         if (tdb->transaction->num_blocks <= blk) {
224                 uint8_t **new_blocks;
225                 /* expand the blocks array */
226                 if (tdb->transaction->blocks == NULL) {
227                         new_blocks = (uint8_t **)malloc(
228                                 (blk+1)*sizeof(uint8_t *));
229                 } else {
230                         new_blocks = (uint8_t **)realloc(
231                                 tdb->transaction->blocks,
232                                 (blk+1)*sizeof(uint8_t *));
233                 }
234                 if (new_blocks == NULL) {
235                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
236                                            "transaction_write:"
237                                            " failed to allocate");
238                         goto fail;
239                 }
240                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
241                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
242                 tdb->transaction->blocks = new_blocks;
243                 tdb->transaction->num_blocks = blk+1;
244                 tdb->transaction->last_block_size = 0;
245         }
246
247         /* allocate and fill a block? */
248         if (tdb->transaction->blocks[blk] == NULL) {
249                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1);
250                 if (tdb->transaction->blocks[blk] == NULL) {
251                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
252                                            "transaction_write:"
253                                            " failed to allocate");
254                         goto fail;
255                 }
256                 if (tdb->transaction->old_map_size > blk * PAGESIZE) {
257                         tdb_len_t len2 = PAGESIZE;
258                         if (len2 + (blk * PAGESIZE) > tdb->transaction->old_map_size) {
259                                 len2 = tdb->transaction->old_map_size - (blk * PAGESIZE);
260                         }
261                         ecode = tdb->transaction->io_methods->tread(tdb,
262                                         blk * PAGESIZE,
263                                         tdb->transaction->blocks[blk],
264                                         len2);
265                         if (ecode != TDB_SUCCESS) {
266                                 ecode = tdb_logerr(tdb, ecode,
267                                                    TDB_LOG_ERROR,
268                                                    "transaction_write:"
269                                                    " failed to"
270                                                    " read old block: %s",
271                                                    strerror(errno));
272                                 SAFE_FREE(tdb->transaction->blocks[blk]);
273                                 goto fail;
274                         }
275                         if (blk == tdb->transaction->num_blocks-1) {
276                                 tdb->transaction->last_block_size = len2;
277                         }
278                 }
279         }
280
281         /* overwrite part of an existing block */
282         if (buf == NULL) {
283                 memset(tdb->transaction->blocks[blk] + off, 0, len);
284         } else {
285                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
286         }
287         if (blk == tdb->transaction->num_blocks-1) {
288                 if (len + off > tdb->transaction->last_block_size) {
289                         tdb->transaction->last_block_size = len + off;
290                 }
291         }
292
293         return TDB_SUCCESS;
294
295 fail:
296         tdb->transaction->transaction_error = 1;
297         return ecode;
298 }
299
300
301 /*
302   write while in a transaction - this variant never expands the transaction blocks, it only
303   updates existing blocks. This means it cannot change the recovery size
304 */
305 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
306                                        const void *buf, tdb_len_t len)
307 {
308         size_t blk;
309
310         /* break it up into block sized chunks */
311         while (len + (off % PAGESIZE) > PAGESIZE) {
312                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
313                 transaction_write_existing(tdb, off, buf, len2);
314                 len -= len2;
315                 off += len2;
316                 if (buf != NULL) {
317                         buf = (const void *)(len2 + (const char *)buf);
318                 }
319         }
320
321         if (len == 0) {
322                 return;
323         }
324
325         blk = off / PAGESIZE;
326         off = off % PAGESIZE;
327
328         if (tdb->transaction->num_blocks <= blk ||
329             tdb->transaction->blocks[blk] == NULL) {
330                 return;
331         }
332
333         if (blk == tdb->transaction->num_blocks-1 &&
334             off + len > tdb->transaction->last_block_size) {
335                 if (off >= tdb->transaction->last_block_size) {
336                         return;
337                 }
338                 len = tdb->transaction->last_block_size - off;
339         }
340
341         /* overwrite part of an existing block */
342         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
343 }
344
345
346 /*
347   out of bounds check during a transaction
348 */
349 static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len,
350                                       bool probe)
351 {
352         if (len <= tdb->file->map_size) {
353                 return TDB_SUCCESS;
354         }
355         if (!probe) {
356                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
357                            "tdb_oob len %lld beyond transaction size %lld",
358                            (long long)len,
359                            (long long)tdb->file->map_size);
360         }
361         return TDB_ERR_IO;
362 }
363
364 /*
365   transaction version of tdb_expand().
366 */
367 static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb,
368                                               tdb_off_t addition)
369 {
370         enum TDB_ERROR ecode;
371
372         /* add a write to the transaction elements, so subsequent
373            reads see the zero data */
374         ecode = transaction_write(tdb, tdb->file->map_size, NULL, addition);
375         if (ecode == TDB_SUCCESS) {
376                 tdb->file->map_size += addition;
377         }
378         return ecode;
379 }
380
381 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
382                                 size_t len, bool write_mode)
383 {
384         size_t blk = off / PAGESIZE, end_blk;
385
386         /* This is wrong for zero-length blocks, but will fail gracefully */
387         end_blk = (off + len - 1) / PAGESIZE;
388
389         /* Can only do direct if in single block and we've already copied. */
390         if (write_mode) {
391                 if (blk != end_blk)
392                         return NULL;
393                 if (blk >= tdb->transaction->num_blocks)
394                         return NULL;
395                 if (tdb->transaction->blocks[blk] == NULL)
396                         return NULL;
397                 return tdb->transaction->blocks[blk] + off % PAGESIZE;
398         }
399
400         /* Single which we have copied? */
401         if (blk == end_blk
402             && blk < tdb->transaction->num_blocks
403             && tdb->transaction->blocks[blk])
404                 return tdb->transaction->blocks[blk] + off % PAGESIZE;
405
406         /* Otherwise must be all not copied. */
407         while (blk <= end_blk) {
408                 if (blk >= tdb->transaction->num_blocks)
409                         break;
410                 if (tdb->transaction->blocks[blk])
411                         return NULL;
412                 blk++;
413         }
414         return tdb->transaction->io_methods->direct(tdb, off, len, false);
415 }
416
417 static const struct tdb_methods transaction_methods = {
418         transaction_read,
419         transaction_write,
420         transaction_oob,
421         transaction_expand_file,
422         transaction_direct,
423 };
424
425 /*
426   sync to disk
427 */
428 static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
429                                        tdb_off_t offset, tdb_len_t length)
430 {
431         if (tdb->flags & TDB_NOSYNC) {
432                 return TDB_SUCCESS;
433         }
434
435         if (fsync(tdb->file->fd) != 0) {
436                 return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
437                                   "tdb_transaction: fsync failed: %s",
438                                   strerror(errno));
439         }
440 #ifdef MS_SYNC
441         if (tdb->file->map_ptr) {
442                 tdb_off_t moffset = offset & ~(PAGESIZE-1);
443                 if (msync(moffset + (char *)tdb->file->map_ptr,
444                           length + (offset - moffset), MS_SYNC) != 0) {
445                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
446                                           "tdb_transaction: msync failed: %s",
447                                           strerror(errno));
448                 }
449         }
450 #endif
451         return TDB_SUCCESS;
452 }
453
454
455 static void _tdb_transaction_cancel(struct tdb_context *tdb)
456 {
457         int i;
458         enum TDB_ERROR ecode;
459
460         if (tdb->transaction == NULL) {
461                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
462                            "tdb_transaction_cancel: no transaction");
463                 return;
464         }
465
466         if (tdb->transaction->nesting != 0) {
467                 tdb->transaction->transaction_error = 1;
468                 tdb->transaction->nesting--;
469                 return;
470         }
471
472         tdb->file->map_size = tdb->transaction->old_map_size;
473
474         /* free all the transaction blocks */
475         for (i=0;i<tdb->transaction->num_blocks;i++) {
476                 if (tdb->transaction->blocks[i] != NULL) {
477                         free(tdb->transaction->blocks[i]);
478                 }
479         }
480         SAFE_FREE(tdb->transaction->blocks);
481
482         if (tdb->transaction->magic_offset) {
483                 const struct tdb_methods *methods = tdb->transaction->io_methods;
484                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
485
486                 /* remove the recovery marker */
487                 ecode = methods->twrite(tdb, tdb->transaction->magic_offset,
488                                         &invalid, sizeof(invalid));
489                 if (ecode == TDB_SUCCESS)
490                         ecode = transaction_sync(tdb,
491                                                  tdb->transaction->magic_offset,
492                                                  sizeof(invalid));
493                 if (ecode != TDB_SUCCESS) {
494                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
495                                    "tdb_transaction_cancel: failed to remove"
496                                    " recovery magic");
497                 }
498         }
499
500         if (tdb->file->allrecord_lock.count)
501                 tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
502
503         /* restore the normal io methods */
504         tdb->methods = tdb->transaction->io_methods;
505
506         tdb_transaction_unlock(tdb, F_WRLCK);
507
508         if (tdb_has_open_lock(tdb))
509                 tdb_unlock_open(tdb);
510
511         SAFE_FREE(tdb->transaction);
512 }
513
514 /*
515   start a tdb transaction. No token is returned, as only a single
516   transaction is allowed to be pending per tdb_context
517 */
518 enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
519 {
520         enum TDB_ERROR ecode;
521
522         /* some sanity checks */
523         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
524                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
525                                                     TDB_LOG_USE_ERROR,
526                                                     "tdb_transaction_start:"
527                                                     " cannot start a"
528                                                     " transaction on a "
529                                                     "read-only or internal db");
530         }
531
532         /* cope with nested tdb_transaction_start() calls */
533         if (tdb->transaction != NULL) {
534                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO,
535                                                     TDB_LOG_USE_ERROR,
536                                                     "tdb_transaction_start:"
537                                                     " already inside"
538                                                     " transaction");
539         }
540
541         if (tdb_has_hash_locks(tdb)) {
542                 /* the caller must not have any locks when starting a
543                    transaction as otherwise we'll be screwed by lack
544                    of nested locks in POSIX */
545                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK,
546                                                     TDB_LOG_USE_ERROR,
547                                                     "tdb_transaction_start:"
548                                                     " cannot start a"
549                                                     " transaction with locks"
550                                                     " held");
551         }
552
553         tdb->transaction = (struct tdb_transaction *)
554                 calloc(sizeof(struct tdb_transaction), 1);
555         if (tdb->transaction == NULL) {
556                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM,
557                                                     TDB_LOG_ERROR,
558                                                     "tdb_transaction_start:"
559                                                     " cannot allocate");
560         }
561
562         /* get the transaction write lock. This is a blocking lock. As
563            discussed with Volker, there are a number of ways we could
564            make this async, which we will probably do in the future */
565         ecode = tdb_transaction_lock(tdb, F_WRLCK);
566         if (ecode != TDB_SUCCESS) {
567                 SAFE_FREE(tdb->transaction->blocks);
568                 SAFE_FREE(tdb->transaction);
569                 return tdb->last_error = ecode;
570         }
571
572         /* get a read lock over entire file. This is upgraded to a write
573            lock during the commit */
574         ecode = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true);
575         if (ecode != TDB_SUCCESS) {
576                 goto fail_allrecord_lock;
577         }
578
579         /* make sure we know about any file expansions already done by
580            anyone else */
581         tdb->methods->oob(tdb, tdb->file->map_size + 1, true);
582         tdb->transaction->old_map_size = tdb->file->map_size;
583
584         /* finally hook the io methods, replacing them with
585            transaction specific methods */
586         tdb->transaction->io_methods = tdb->methods;
587         tdb->methods = &transaction_methods;
588         return tdb->last_error = TDB_SUCCESS;
589
590 fail_allrecord_lock:
591         tdb_transaction_unlock(tdb, F_WRLCK);
592         SAFE_FREE(tdb->transaction->blocks);
593         SAFE_FREE(tdb->transaction);
594         return tdb->last_error = ecode;
595 }
596
597
598 /*
599   cancel the current transaction
600 */
601 void tdb_transaction_cancel(struct tdb_context *tdb)
602 {
603         _tdb_transaction_cancel(tdb);
604 }
605
606 /*
607   work out how much space the linearised recovery data will consume
608 */
609 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
610 {
611         tdb_len_t recovery_size = 0;
612         int i;
613
614         recovery_size = sizeof(tdb_len_t);
615         for (i=0;i<tdb->transaction->num_blocks;i++) {
616                 if (i * PAGESIZE >= tdb->transaction->old_map_size) {
617                         break;
618                 }
619                 if (tdb->transaction->blocks[i] == NULL) {
620                         continue;
621                 }
622                 recovery_size += 2*sizeof(tdb_off_t);
623                 if (i == tdb->transaction->num_blocks-1) {
624                         recovery_size += tdb->transaction->last_block_size;
625                 } else {
626                         recovery_size += PAGESIZE;
627                 }
628         }
629
630         return recovery_size;
631 }
632
633 /*
634   allocate the recovery area, or use an existing recovery area if it is
635   large enough
636 */
637 static enum TDB_ERROR tdb_recovery_allocate(struct tdb_context *tdb,
638                                             tdb_len_t *recovery_size,
639                                             tdb_off_t *recovery_offset,
640                                             tdb_len_t *recovery_max_size)
641 {
642         struct tdb_recovery_record rec;
643         const struct tdb_methods *methods = tdb->transaction->io_methods;
644         tdb_off_t recovery_head;
645         size_t addition;
646         enum TDB_ERROR ecode;
647
648         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
649         if (TDB_OFF_IS_ERR(recovery_head)) {
650                 return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
651                                   "tdb_recovery_allocate:"
652                                   " failed to read recovery head");
653         }
654
655         if (recovery_head != 0) {
656                 ecode = methods->tread(tdb, recovery_head, &rec, sizeof(rec));
657                 if (ecode != TDB_SUCCESS) {
658                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
659                                           "tdb_recovery_allocate:"
660                                           " failed to read recovery record");
661                 }
662                 tdb_convert(tdb, &rec, sizeof(rec));
663                 /* ignore invalid recovery regions: can happen in crash */
664                 if (rec.magic != TDB_RECOVERY_MAGIC &&
665                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
666                         recovery_head = 0;
667                 }
668         }
669
670         *recovery_size = tdb_recovery_size(tdb);
671
672         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
673                 /* it fits in the existing area */
674                 *recovery_max_size = rec.max_len;
675                 *recovery_offset = recovery_head;
676                 return TDB_SUCCESS;
677         }
678
679         /* we need to free up the old recovery area, then allocate a
680            new one at the end of the file. Note that we cannot use
681            normal allocation to allocate the new one as that might return
682            us an area that is being currently used (as of the start of
683            the transaction) */
684         if (recovery_head != 0) {
685                 add_stat(tdb, frees, 1);
686                 ecode = add_free_record(tdb, recovery_head,
687                                         sizeof(rec) + rec.max_len);
688                 if (ecode != TDB_SUCCESS) {
689                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
690                                           "tdb_recovery_allocate:"
691                                           " failed to free previous"
692                                           " recovery area");
693                 }
694         }
695
696         /* the tdb_free() call might have increased the recovery size */
697         *recovery_size = tdb_recovery_size(tdb);
698
699         /* round up to a multiple of page size */
700         *recovery_max_size
701                 = (((sizeof(rec) + *recovery_size) + PAGESIZE-1)
702                    & ~(PAGESIZE-1))
703                 - sizeof(rec);
704         *recovery_offset = tdb->file->map_size;
705         recovery_head = *recovery_offset;
706
707         /* Restore ->map_size before calling underlying expand_file.
708            Also so that we don't try to expand the file again in the
709            transaction commit, which would destroy the recovery
710            area */
711         addition = (tdb->file->map_size - tdb->transaction->old_map_size) +
712                 sizeof(rec) + *recovery_max_size;
713         tdb->file->map_size = tdb->transaction->old_map_size;
714         ecode = methods->expand_file(tdb, addition);
715         if (ecode != TDB_SUCCESS) {
716                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
717                                   "tdb_recovery_allocate:"
718                                   " failed to create recovery area");
719         }
720
721         /* we have to reset the old map size so that we don't try to
722            expand the file again in the transaction commit, which
723            would destroy the recovery area */
724         tdb->transaction->old_map_size = tdb->file->map_size;
725
726         /* write the recovery header offset and sync - we can sync without a race here
727            as the magic ptr in the recovery record has not been set */
728         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
729         ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery),
730                                 &recovery_head, sizeof(tdb_off_t));
731         if (ecode != TDB_SUCCESS) {
732                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
733                                   "tdb_recovery_allocate:"
734                                   " failed to write recovery head");
735         }
736         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
737                                    &recovery_head,
738                                    sizeof(tdb_off_t));
739         return TDB_SUCCESS;
740 }
741
742 /* Set up header for the recovery record. */
743 static void set_recovery_header(struct tdb_recovery_record *rec,
744                                 uint64_t magic,
745                                 uint64_t datalen, uint64_t actuallen,
746                                 uint64_t oldsize)
747 {
748         rec->magic = magic;
749         rec->max_len = actuallen;
750         rec->len = datalen;
751         rec->eof = oldsize;
752 }
753
754 /*
755   setup the recovery data that will be used on a crash during commit
756 */
757 static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb,
758                                                  tdb_off_t *magic_offset)
759 {
760         /* Initialized for GCC's 4.4.5 overzealous uninitialized warnings. */
761         tdb_len_t recovery_size = 0;
762         tdb_off_t recovery_offset = 0, recovery_max_size = 0;
763         unsigned char *data, *p;
764         const struct tdb_methods *methods = tdb->transaction->io_methods;
765         struct tdb_recovery_record *rec;
766         tdb_off_t old_map_size = tdb->transaction->old_map_size;
767         uint64_t magic, tailer;
768         int i;
769         enum TDB_ERROR ecode;
770
771         /*
772           check that the recovery area has enough space
773         */
774         ecode = tdb_recovery_allocate(tdb, &recovery_size,
775                                       &recovery_offset, &recovery_max_size);
776         if (ecode != TDB_SUCCESS) {
777                 return ecode;
778         }
779
780         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
781         if (data == NULL) {
782                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
783                                   "transaction_setup_recovery:"
784                                   " cannot allocate");
785         }
786
787         rec = (struct tdb_recovery_record *)data;
788         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
789                             recovery_size, recovery_max_size, old_map_size);
790         tdb_convert(tdb, rec, sizeof(*rec));
791
792         /* build the recovery data into a single blob to allow us to do a single
793            large write, which should be more efficient */
794         p = data + sizeof(*rec);
795         for (i=0;i<tdb->transaction->num_blocks;i++) {
796                 tdb_off_t offset;
797                 tdb_len_t length;
798
799                 if (tdb->transaction->blocks[i] == NULL) {
800                         continue;
801                 }
802
803                 offset = i * PAGESIZE;
804                 length = PAGESIZE;
805                 if (i == tdb->transaction->num_blocks-1) {
806                         length = tdb->transaction->last_block_size;
807                 }
808
809                 if (offset >= old_map_size) {
810                         continue;
811                 }
812                 if (offset + length > tdb->file->map_size) {
813                         free(data);
814                         return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
815                                           "tdb_transaction_setup_recovery:"
816                                           " transaction data over new region"
817                                           " boundary");
818                 }
819                 memcpy(p, &offset, sizeof(offset));
820                 memcpy(p + sizeof(offset), &length, sizeof(length));
821                 tdb_convert(tdb, p, sizeof(offset) + sizeof(length));
822
823                 /* the recovery area contains the old data, not the
824                    new data, so we have to call the original tdb_read
825                    method to get it */
826                 ecode = methods->tread(tdb, offset,
827                                        p + sizeof(offset) + sizeof(length),
828                                        length);
829                 if (ecode != TDB_SUCCESS) {
830                         free(data);
831                         return ecode;
832                 }
833                 p += sizeof(offset) + sizeof(length) + length;
834         }
835
836         /* and the tailer */
837         tailer = sizeof(*rec) + recovery_max_size;
838         memcpy(p, &tailer, sizeof(tailer));
839         tdb_convert(tdb, p, sizeof(tailer));
840
841         /* write the recovery data to the recovery area */
842         ecode = methods->twrite(tdb, recovery_offset, data,
843                                 sizeof(*rec) + recovery_size);
844         if (ecode != TDB_SUCCESS) {
845                 free(data);
846                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
847                                   "tdb_transaction_setup_recovery:"
848                                   " failed to write recovery data");
849         }
850         transaction_write_existing(tdb, recovery_offset, data,
851                                    sizeof(*rec) + recovery_size);
852
853         /* as we don't have ordered writes, we have to sync the recovery
854            data before we update the magic to indicate that the recovery
855            data is present */
856         ecode = transaction_sync(tdb, recovery_offset,
857                                  sizeof(*rec) + recovery_size);
858         if (ecode != TDB_SUCCESS) {
859                 free(data);
860                 return ecode;
861         }
862
863         free(data);
864
865         magic = TDB_RECOVERY_MAGIC;
866         tdb_convert(tdb, &magic, sizeof(magic));
867
868         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
869                                                    magic);
870
871         ecode = methods->twrite(tdb, *magic_offset, &magic, sizeof(magic));
872         if (ecode != TDB_SUCCESS) {
873                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
874                                   "tdb_transaction_setup_recovery:"
875                                   " failed to write recovery magic");
876         }
877         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
878
879         /* ensure the recovery magic marker is on disk */
880         return transaction_sync(tdb, *magic_offset, sizeof(magic));
881 }
882
883 static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
884 {
885         const struct tdb_methods *methods;
886         enum TDB_ERROR ecode;
887
888         if (tdb->transaction == NULL) {
889                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
890                                   "tdb_transaction_prepare_commit:"
891                                   " no transaction");
892         }
893
894         if (tdb->transaction->prepared) {
895                 _tdb_transaction_cancel(tdb);
896                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
897                                   "tdb_transaction_prepare_commit:"
898                                   " transaction already prepared");
899         }
900
901         if (tdb->transaction->transaction_error) {
902                 _tdb_transaction_cancel(tdb);
903                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
904                                   "tdb_transaction_prepare_commit:"
905                                   " transaction error pending");
906         }
907
908
909         if (tdb->transaction->nesting != 0) {
910                 tdb->transaction->nesting--;
911                 return TDB_SUCCESS;
912         }
913
914         /* check for a null transaction */
915         if (tdb->transaction->blocks == NULL) {
916                 return TDB_SUCCESS;
917         }
918
919         methods = tdb->transaction->io_methods;
920
921         /* upgrade the main transaction lock region to a write lock */
922         ecode = tdb_allrecord_upgrade(tdb);
923         if (ecode != TDB_SUCCESS) {
924                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
925                          "tdb_transaction_prepare_commit:"
926                          " failed to upgrade hash locks");
927                 _tdb_transaction_cancel(tdb);
928                 return ecode;
929         }
930
931         /* get the open lock - this prevents new users attaching to the database
932            during the commit */
933         ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
934         if (ecode != TDB_SUCCESS) {
935                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
936                            "tdb_transaction_prepare_commit:"
937                            " failed to get open lock");
938                 _tdb_transaction_cancel(tdb);
939                 return ecode;
940         }
941
942         /* Since we have whole db locked, we don't need the expansion lock. */
943         if (!(tdb->flags & TDB_NOSYNC)) {
944                 /* write the recovery data to the end of the file */
945                 ecode = transaction_setup_recovery(tdb,
946                                                    &tdb->transaction
947                                                    ->magic_offset);
948                 if (ecode != TDB_SUCCESS) {
949                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
950                                  "tdb_transaction_prepare_commit:"
951                                  " failed to setup recovery data");
952                         _tdb_transaction_cancel(tdb);
953                         return ecode;
954                 }
955         }
956
957         tdb->transaction->prepared = true;
958
959         /* expand the file to the new size if needed */
960         if (tdb->file->map_size != tdb->transaction->old_map_size) {
961                 tdb_len_t add;
962
963                 add = tdb->file->map_size - tdb->transaction->old_map_size;
964                 /* Restore original map size for tdb_expand_file */
965                 tdb->file->map_size = tdb->transaction->old_map_size;
966                 ecode = methods->expand_file(tdb, add);
967                 if (ecode != TDB_SUCCESS) {
968                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
969                                  "tdb_transaction_prepare_commit:"
970                                  " expansion failed");
971                         _tdb_transaction_cancel(tdb);
972                         return ecode;
973                 }
974         }
975
976         /* Keep the open lock until the actual commit */
977         return TDB_SUCCESS;
978 }
979
980 /*
981    prepare to commit the current transaction
982 */
983 enum TDB_ERROR tdb_transaction_prepare_commit(struct tdb_context *tdb)
984 {
985         return _tdb_transaction_prepare_commit(tdb);
986 }
987
988 /*
989   commit the current transaction
990 */
991 enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb)
992 {
993         const struct tdb_methods *methods;
994         int i;
995         enum TDB_ERROR ecode;
996
997         if (tdb->transaction == NULL) {
998                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
999                                                     TDB_LOG_USE_ERROR,
1000                                                     "tdb_transaction_commit:"
1001                                                     " no transaction");
1002         }
1003
1004         tdb_trace(tdb, "tdb_transaction_commit");
1005
1006         if (tdb->transaction->nesting != 0) {
1007                 tdb->transaction->nesting--;
1008                 return tdb->last_error = TDB_SUCCESS;
1009         }
1010
1011         /* check for a null transaction */
1012         if (tdb->transaction->blocks == NULL) {
1013                 _tdb_transaction_cancel(tdb);
1014                 return tdb->last_error = TDB_SUCCESS;
1015         }
1016
1017         if (!tdb->transaction->prepared) {
1018                 ecode = _tdb_transaction_prepare_commit(tdb);
1019                 if (ecode != TDB_SUCCESS)
1020                         return tdb->last_error = ecode;
1021         }
1022
1023         methods = tdb->transaction->io_methods;
1024
1025         /* perform all the writes */
1026         for (i=0;i<tdb->transaction->num_blocks;i++) {
1027                 tdb_off_t offset;
1028                 tdb_len_t length;
1029
1030                 if (tdb->transaction->blocks[i] == NULL) {
1031                         continue;
1032                 }
1033
1034                 offset = i * PAGESIZE;
1035                 length = PAGESIZE;
1036                 if (i == tdb->transaction->num_blocks-1) {
1037                         length = tdb->transaction->last_block_size;
1038                 }
1039
1040                 ecode = methods->twrite(tdb, offset,
1041                                         tdb->transaction->blocks[i], length);
1042                 if (ecode != TDB_SUCCESS) {
1043                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1044                                    "tdb_transaction_commit:"
1045                                    " write failed during commit");
1046
1047                         /* we've overwritten part of the data and
1048                            possibly expanded the file, so we need to
1049                            run the crash recovery code */
1050                         tdb->methods = methods;
1051                         tdb_transaction_recover(tdb);
1052
1053                         _tdb_transaction_cancel(tdb);
1054
1055                         return tdb->last_error = ecode;
1056                 }
1057                 SAFE_FREE(tdb->transaction->blocks[i]);
1058         }
1059
1060         SAFE_FREE(tdb->transaction->blocks);
1061         tdb->transaction->num_blocks = 0;
1062
1063         /* ensure the new data is on disk */
1064         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1065         if (ecode != TDB_SUCCESS) {
1066                 return tdb->last_error = ecode;
1067         }
1068
1069         /*
1070           TODO: maybe write to some dummy hdr field, or write to magic
1071           offset without mmap, before the last sync, instead of the
1072           utime() call
1073         */
1074
1075         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1076            don't change the mtime of the file, this means the file may
1077            not be backed up (as tdb rounding to block sizes means that
1078            file size changes are quite rare too). The following forces
1079            mtime changes when a transaction completes */
1080 #if HAVE_UTIME
1081         utime(tdb->name, NULL);
1082 #endif
1083
1084         /* use a transaction cancel to free memory and remove the
1085            transaction locks: it "restores" map_size, too. */
1086         tdb->transaction->old_map_size = tdb->file->map_size;
1087         _tdb_transaction_cancel(tdb);
1088
1089         return tdb->last_error = TDB_SUCCESS;
1090 }
1091
1092
1093 /*
1094   recover from an aborted transaction. Must be called with exclusive
1095   database write access already established (including the open
1096   lock to prevent new processes attaching)
1097 */
1098 enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb)
1099 {
1100         tdb_off_t recovery_head, recovery_eof;
1101         unsigned char *data, *p;
1102         struct tdb_recovery_record rec;
1103         enum TDB_ERROR ecode;
1104
1105         /* find the recovery area */
1106         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1107         if (TDB_OFF_IS_ERR(recovery_head)) {
1108                 return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
1109                                   "tdb_transaction_recover:"
1110                                   " failed to read recovery head");
1111         }
1112
1113         if (recovery_head == 0) {
1114                 /* we have never allocated a recovery record */
1115                 return TDB_SUCCESS;
1116         }
1117
1118         /* read the recovery record */
1119         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1120         if (ecode != TDB_SUCCESS) {
1121                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1122                                   "tdb_transaction_recover:"
1123                                   " failed to read recovery record");
1124         }
1125
1126         if (rec.magic != TDB_RECOVERY_MAGIC) {
1127                 /* there is no valid recovery data */
1128                 return TDB_SUCCESS;
1129         }
1130
1131         if (tdb->read_only) {
1132                 return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1133                                   "tdb_transaction_recover:"
1134                                   " attempt to recover read only database");
1135         }
1136
1137         recovery_eof = rec.eof;
1138
1139         data = (unsigned char *)malloc(rec.len);
1140         if (data == NULL) {
1141                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1142                                   "tdb_transaction_recover:"
1143                                   " failed to allocate recovery data");
1144         }
1145
1146         /* read the full recovery data */
1147         ecode = tdb->methods->tread(tdb, recovery_head + sizeof(rec), data,
1148                                     rec.len);
1149         if (ecode != TDB_SUCCESS) {
1150                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1151                                   "tdb_transaction_recover:"
1152                                   " failed to read recovery data");
1153         }
1154
1155         /* recover the file data */
1156         p = data;
1157         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1158                 tdb_off_t ofs;
1159                 tdb_len_t len;
1160                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1161                 memcpy(&ofs, p, sizeof(ofs));
1162                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1163                 p += sizeof(ofs) + sizeof(len);
1164
1165                 ecode = tdb->methods->twrite(tdb, ofs, p, len);
1166                 if (ecode != TDB_SUCCESS) {
1167                         free(data);
1168                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1169                                           "tdb_transaction_recover:"
1170                                           " failed to recover %zu bytes"
1171                                           " at offset %zu",
1172                                           (size_t)len, (size_t)ofs);
1173                 }
1174                 p += len;
1175         }
1176
1177         free(data);
1178
1179         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1180         if (ecode != TDB_SUCCESS) {
1181                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1182                                   "tdb_transaction_recover:"
1183                                   " failed to sync recovery");
1184         }
1185
1186         /* if the recovery area is after the recovered eof then remove it */
1187         if (recovery_eof <= recovery_head) {
1188                 ecode = tdb_write_off(tdb, offsetof(struct tdb_header,
1189                                                     recovery),
1190                                       0);
1191                 if (ecode != TDB_SUCCESS) {
1192                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1193                                           "tdb_transaction_recover:"
1194                                           " failed to remove recovery head");
1195                 }
1196         }
1197
1198         /* remove the recovery magic */
1199         ecode = tdb_write_off(tdb,
1200                               recovery_head
1201                               + offsetof(struct tdb_recovery_record, magic),
1202                               TDB_RECOVERY_INVALID_MAGIC);
1203         if (ecode != TDB_SUCCESS) {
1204                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1205                                   "tdb_transaction_recover:"
1206                                   " failed to remove recovery magic");
1207         }
1208
1209         ecode = transaction_sync(tdb, 0, recovery_eof);
1210         if (ecode != TDB_SUCCESS) {
1211                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1212                                   "tdb_transaction_recover:"
1213                                   " failed to sync2 recovery");
1214         }
1215
1216         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1217                    "tdb_transaction_recover: recovered %zu byte database",
1218                    (size_t)recovery_eof);
1219
1220         /* all done */
1221         return TDB_SUCCESS;
1222 }
1223
1224 tdb_bool_err tdb_needs_recovery(struct tdb_context *tdb)
1225 {
1226         tdb_off_t recovery_head;
1227         struct tdb_recovery_record rec;
1228         enum TDB_ERROR ecode;
1229
1230         /* find the recovery area */
1231         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1232         if (TDB_OFF_IS_ERR(recovery_head)) {
1233                 return recovery_head;
1234         }
1235
1236         if (recovery_head == 0) {
1237                 /* we have never allocated a recovery record */
1238                 return false;
1239         }
1240
1241         /* read the recovery record */
1242         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1243         if (ecode != TDB_SUCCESS) {
1244                 return ecode;
1245         }
1246
1247         return (rec.magic == TDB_RECOVERY_MAGIC);
1248 }