]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
604861b738e75e9c7efab447f7d26d19780a5ffc
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in posix locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is cancelled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or cancelled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off,
129                                        void *buf, tdb_len_t len)
130 {
131         size_t blk;
132         enum TDB_ERROR ecode;
133
134         /* break it down into block sized ops */
135         while (len + (off % getpagesize()) > getpagesize()) {
136                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
137                 ecode = transaction_read(tdb, off, buf, len2);
138                 if (ecode != TDB_SUCCESS) {
139                         return ecode;
140                 }
141                 len -= len2;
142                 off += len2;
143                 buf = (void *)(len2 + (char *)buf);
144         }
145
146         if (len == 0) {
147                 return TDB_SUCCESS;
148         }
149
150         blk = off / getpagesize();
151
152         /* see if we have it in the block list */
153         if (tdb->transaction->num_blocks <= blk ||
154             tdb->transaction->blocks[blk] == NULL) {
155                 /* nope, do a real read */
156                 ecode = tdb->transaction->io_methods->tread(tdb, off, buf, len);
157                 if (ecode != TDB_SUCCESS) {
158                         goto fail;
159                 }
160                 return 0;
161         }
162
163         /* it is in the block list. Now check for the last block */
164         if (blk == tdb->transaction->num_blocks-1) {
165                 if (len > tdb->transaction->last_block_size) {
166                         ecode = TDB_ERR_IO;
167                         goto fail;
168                 }
169         }
170
171         /* now copy it out of this block */
172         memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len);
173         return TDB_SUCCESS;
174
175 fail:
176         tdb->transaction->transaction_error = 1;
177         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
178                           "transaction_read: failed at off=%zu len=%zu",
179                           (size_t)off, (size_t)len);
180 }
181
182
183 /*
184   write while in a transaction
185 */
186 static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off,
187                                         const void *buf, tdb_len_t len)
188 {
189         size_t blk;
190         enum TDB_ERROR ecode;
191
192         /* Only a commit is allowed on a prepared transaction */
193         if (tdb->transaction->prepared) {
194                 ecode = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
195                                    "transaction_write: transaction already"
196                                    " prepared, write not allowed");
197                 goto fail;
198         }
199
200         /* break it up into block sized chunks */
201         while (len + (off % getpagesize()) > getpagesize()) {
202                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
203                 ecode = transaction_write(tdb, off, buf, len2);
204                 if (ecode != TDB_SUCCESS) {
205                         return -1;
206                 }
207                 len -= len2;
208                 off += len2;
209                 if (buf != NULL) {
210                         buf = (const void *)(len2 + (const char *)buf);
211                 }
212         }
213
214         if (len == 0) {
215                 return TDB_SUCCESS;
216         }
217
218         blk = off / getpagesize();
219         off = off % getpagesize();
220
221         if (tdb->transaction->num_blocks <= blk) {
222                 uint8_t **new_blocks;
223                 /* expand the blocks array */
224                 if (tdb->transaction->blocks == NULL) {
225                         new_blocks = (uint8_t **)malloc(
226                                 (blk+1)*sizeof(uint8_t *));
227                 } else {
228                         new_blocks = (uint8_t **)realloc(
229                                 tdb->transaction->blocks,
230                                 (blk+1)*sizeof(uint8_t *));
231                 }
232                 if (new_blocks == NULL) {
233                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
234                                            "transaction_write:"
235                                            " failed to allocate");
236                         goto fail;
237                 }
238                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
239                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
240                 tdb->transaction->blocks = new_blocks;
241                 tdb->transaction->num_blocks = blk+1;
242                 tdb->transaction->last_block_size = 0;
243         }
244
245         /* allocate and fill a block? */
246         if (tdb->transaction->blocks[blk] == NULL) {
247                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1);
248                 if (tdb->transaction->blocks[blk] == NULL) {
249                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
250                                            "transaction_write:"
251                                            " failed to allocate");
252                         goto fail;
253                 }
254                 if (tdb->transaction->old_map_size > blk * getpagesize()) {
255                         tdb_len_t len2 = getpagesize();
256                         if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) {
257                                 len2 = tdb->transaction->old_map_size - (blk * getpagesize());
258                         }
259                         ecode = tdb->transaction->io_methods->tread(tdb,
260                                         blk * getpagesize(),
261                                         tdb->transaction->blocks[blk],
262                                         len2);
263                         if (ecode != TDB_SUCCESS) {
264                                 ecode = tdb_logerr(tdb, ecode,
265                                                    TDB_LOG_ERROR,
266                                                    "transaction_write:"
267                                                    " failed to"
268                                                    " read old block: %s",
269                                                    strerror(errno));
270                                 SAFE_FREE(tdb->transaction->blocks[blk]);
271                                 goto fail;
272                         }
273                         if (blk == tdb->transaction->num_blocks-1) {
274                                 tdb->transaction->last_block_size = len2;
275                         }
276                 }
277         }
278
279         /* overwrite part of an existing block */
280         if (buf == NULL) {
281                 memset(tdb->transaction->blocks[blk] + off, 0, len);
282         } else {
283                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
284         }
285         if (blk == tdb->transaction->num_blocks-1) {
286                 if (len + off > tdb->transaction->last_block_size) {
287                         tdb->transaction->last_block_size = len + off;
288                 }
289         }
290
291         return TDB_SUCCESS;
292
293 fail:
294         tdb->transaction->transaction_error = 1;
295         return ecode;
296 }
297
298
299 /*
300   write while in a transaction - this varient never expands the transaction blocks, it only
301   updates existing blocks. This means it cannot change the recovery size
302 */
303 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
304                                        const void *buf, tdb_len_t len)
305 {
306         size_t blk;
307
308         /* break it up into block sized chunks */
309         while (len + (off % getpagesize()) > getpagesize()) {
310                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
311                 transaction_write_existing(tdb, off, buf, len2);
312                 len -= len2;
313                 off += len2;
314                 if (buf != NULL) {
315                         buf = (const void *)(len2 + (const char *)buf);
316                 }
317         }
318
319         if (len == 0) {
320                 return;
321         }
322
323         blk = off / getpagesize();
324         off = off % getpagesize();
325
326         if (tdb->transaction->num_blocks <= blk ||
327             tdb->transaction->blocks[blk] == NULL) {
328                 return;
329         }
330
331         if (blk == tdb->transaction->num_blocks-1 &&
332             off + len > tdb->transaction->last_block_size) {
333                 if (off >= tdb->transaction->last_block_size) {
334                         return;
335                 }
336                 len = tdb->transaction->last_block_size - off;
337         }
338
339         /* overwrite part of an existing block */
340         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
341 }
342
343
344 /*
345   out of bounds check during a transaction
346 */
347 static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len,
348                                       bool probe)
349 {
350         if (len <= tdb->map_size) {
351                 return TDB_SUCCESS;
352         }
353         if (!probe) {
354                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
355                            "tdb_oob len %lld beyond transaction size %lld",
356                            (long long)len,
357                            (long long)tdb->map_size);
358         }
359         return TDB_ERR_IO;
360 }
361
362 /*
363   transaction version of tdb_expand().
364 */
365 static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb,
366                                               tdb_off_t addition)
367 {
368         enum TDB_ERROR ecode;
369
370         /* add a write to the transaction elements, so subsequent
371            reads see the zero data */
372         ecode = transaction_write(tdb, tdb->map_size, NULL, addition);
373         if (ecode != TDB_SUCCESS) {
374                 tdb->ecode = ecode;
375                 return ecode;
376         }
377         tdb->map_size += addition;
378         return ecode;
379 }
380
381 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
382                                 size_t len, bool write_mode)
383 {
384         size_t blk = off / getpagesize(), end_blk;
385
386         /* This is wrong for zero-length blocks, but will fail gracefully */
387         end_blk = (off + len - 1) / getpagesize();
388
389         /* Can only do direct if in single block and we've already copied. */
390         if (write_mode) {
391                 if (blk != end_blk)
392                         return NULL;
393                 if (blk >= tdb->transaction->num_blocks)
394                         return NULL;
395                 if (tdb->transaction->blocks[blk] == NULL)
396                         return NULL;
397                 return tdb->transaction->blocks[blk] + off % getpagesize();
398         }
399
400         /* Single which we have copied? */
401         if (blk == end_blk
402             && blk < tdb->transaction->num_blocks
403             && tdb->transaction->blocks[blk])
404                 return tdb->transaction->blocks[blk] + off % getpagesize();
405
406         /* Otherwise must be all not copied. */
407         while (blk < end_blk) {
408                 if (blk >= tdb->transaction->num_blocks)
409                         break;
410                 if (tdb->transaction->blocks[blk])
411                         return NULL;
412                 blk++;
413         }
414         return tdb->transaction->io_methods->direct(tdb, off, len, false);
415 }
416
417 static const struct tdb_methods transaction_methods = {
418         transaction_read,
419         transaction_write,
420         transaction_oob,
421         transaction_expand_file,
422         transaction_direct,
423 };
424
425 /*
426   sync to disk
427 */
428 static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
429                                        tdb_off_t offset, tdb_len_t length)
430 {
431         if (tdb->flags & TDB_NOSYNC) {
432                 return TDB_SUCCESS;
433         }
434
435         if (fsync(tdb->fd) != 0) {
436                 return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
437                                   "tdb_transaction: fsync failed: %s",
438                                   strerror(errno));
439         }
440 #ifdef MS_SYNC
441         if (tdb->map_ptr) {
442                 tdb_off_t moffset = offset & ~(getpagesize()-1);
443                 if (msync(moffset + (char *)tdb->map_ptr,
444                           length + (offset - moffset), MS_SYNC) != 0) {
445                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
446                                           "tdb_transaction: msync failed: %s",
447                                           strerror(errno));
448                 }
449         }
450 #endif
451         return TDB_SUCCESS;
452 }
453
454
455 static void _tdb_transaction_cancel(struct tdb_context *tdb)
456 {
457         int i;
458         enum TDB_ERROR ecode;
459
460         if (tdb->transaction == NULL) {
461                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
462                            "tdb_transaction_cancel: no transaction");
463                 return;
464         }
465
466         if (tdb->transaction->nesting != 0) {
467                 tdb->transaction->transaction_error = 1;
468                 tdb->transaction->nesting--;
469                 return;
470         }
471
472         tdb->map_size = tdb->transaction->old_map_size;
473
474         /* free all the transaction blocks */
475         for (i=0;i<tdb->transaction->num_blocks;i++) {
476                 if (tdb->transaction->blocks[i] != NULL) {
477                         free(tdb->transaction->blocks[i]);
478                 }
479         }
480         SAFE_FREE(tdb->transaction->blocks);
481
482         if (tdb->transaction->magic_offset) {
483                 const struct tdb_methods *methods = tdb->transaction->io_methods;
484                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
485
486                 /* remove the recovery marker */
487                 ecode = methods->twrite(tdb, tdb->transaction->magic_offset,
488                                         &invalid, sizeof(invalid));
489                 if (ecode == TDB_SUCCESS)
490                         ecode = transaction_sync(tdb,
491                                                  tdb->transaction->magic_offset,
492                                                  sizeof(invalid));
493                 if (ecode != TDB_SUCCESS) {
494                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
495                                    "tdb_transaction_cancel: failed to remove"
496                                    " recovery magic");
497                 }
498         }
499
500         if (tdb->allrecord_lock.count)
501                 tdb_allrecord_unlock(tdb, tdb->allrecord_lock.ltype);
502
503         /* restore the normal io methods */
504         tdb->methods = tdb->transaction->io_methods;
505
506         tdb_transaction_unlock(tdb, F_WRLCK);
507
508         if (tdb_has_open_lock(tdb))
509                 tdb_unlock_open(tdb);
510
511         SAFE_FREE(tdb->transaction);
512 }
513
514 /*
515   start a tdb transaction. No token is returned, as only a single
516   transaction is allowed to be pending per tdb_context
517 */
518 int tdb_transaction_start(struct tdb_context *tdb)
519 {
520         enum TDB_ERROR ecode;
521
522         /* some sanity checks */
523         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
524                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
525                            "tdb_transaction_start: cannot start a transaction"
526                            " on a read-only or internal db");
527                 return -1;
528         }
529
530         /* cope with nested tdb_transaction_start() calls */
531         if (tdb->transaction != NULL) {
532                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_USE_ERROR,
533                            "tdb_transaction_start:"
534                            " already inside transaction");
535                 return -1;
536         }
537
538         if (tdb_has_hash_locks(tdb)) {
539                 /* the caller must not have any locks when starting a
540                    transaction as otherwise we'll be screwed by lack
541                    of nested locks in posix */
542                 tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
543                            "tdb_transaction_start: cannot start a transaction"
544                            " with locks held");
545                 return -1;
546         }
547
548         tdb->transaction = (struct tdb_transaction *)
549                 calloc(sizeof(struct tdb_transaction), 1);
550         if (tdb->transaction == NULL) {
551                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
552                            "tdb_transaction_start: cannot allocate");
553                 return -1;
554         }
555
556         /* get the transaction write lock. This is a blocking lock. As
557            discussed with Volker, there are a number of ways we could
558            make this async, which we will probably do in the future */
559         ecode = tdb_transaction_lock(tdb, F_WRLCK);
560         if (ecode != TDB_SUCCESS) {
561                 tdb->ecode = ecode;
562                 SAFE_FREE(tdb->transaction->blocks);
563                 SAFE_FREE(tdb->transaction);
564                 return -1;
565         }
566
567         /* get a read lock over entire file. This is upgraded to a write
568            lock during the commit */
569         ecode = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true);
570         if (ecode != TDB_SUCCESS) {
571                 tdb->ecode = ecode;
572                 goto fail_allrecord_lock;
573         }
574
575         /* make sure we know about any file expansions already done by
576            anyone else */
577         tdb->methods->oob(tdb, tdb->map_size + 1, true);
578         tdb->transaction->old_map_size = tdb->map_size;
579
580         /* finally hook the io methods, replacing them with
581            transaction specific methods */
582         tdb->transaction->io_methods = tdb->methods;
583         tdb->methods = &transaction_methods;
584         return 0;
585
586 fail_allrecord_lock:
587         tdb_transaction_unlock(tdb, F_WRLCK);
588         SAFE_FREE(tdb->transaction->blocks);
589         SAFE_FREE(tdb->transaction);
590         return -1;
591 }
592
593
594 /*
595   cancel the current transaction
596 */
597 void tdb_transaction_cancel(struct tdb_context *tdb)
598 {
599         _tdb_transaction_cancel(tdb);
600 }
601
602 /*
603   work out how much space the linearised recovery data will consume
604 */
605 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
606 {
607         tdb_len_t recovery_size = 0;
608         int i;
609
610         recovery_size = sizeof(tdb_len_t);
611         for (i=0;i<tdb->transaction->num_blocks;i++) {
612                 if (i * getpagesize() >= tdb->transaction->old_map_size) {
613                         break;
614                 }
615                 if (tdb->transaction->blocks[i] == NULL) {
616                         continue;
617                 }
618                 recovery_size += 2*sizeof(tdb_off_t);
619                 if (i == tdb->transaction->num_blocks-1) {
620                         recovery_size += tdb->transaction->last_block_size;
621                 } else {
622                         recovery_size += getpagesize();
623                 }
624         }
625
626         return recovery_size;
627 }
628
629 /*
630   allocate the recovery area, or use an existing recovery area if it is
631   large enough
632 */
633 static int tdb_recovery_allocate(struct tdb_context *tdb,
634                                  tdb_len_t *recovery_size,
635                                  tdb_off_t *recovery_offset,
636                                  tdb_len_t *recovery_max_size)
637 {
638         struct tdb_recovery_record rec;
639         const struct tdb_methods *methods = tdb->transaction->io_methods;
640         tdb_off_t recovery_head;
641         size_t addition;
642         enum TDB_ERROR ecode;
643
644         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
645         if (TDB_OFF_IS_ERR(recovery_head)) {
646                 tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
647                          "tdb_recovery_allocate:"
648                          " failed to read recovery head");
649                 return -1;
650         }
651
652         if (recovery_head != 0) {
653                 ecode = methods->tread(tdb, recovery_head, &rec, sizeof(rec));
654                 if (ecode != TDB_SUCCESS) {
655                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
656                                  "tdb_recovery_allocate:"
657                                  " failed to read recovery record");
658                         return -1;
659                 }
660                 tdb_convert(tdb, &rec, sizeof(rec));
661                 /* ignore invalid recovery regions: can happen in crash */
662                 if (rec.magic != TDB_RECOVERY_MAGIC &&
663                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
664                         recovery_head = 0;
665                 }
666         }
667
668         *recovery_size = tdb_recovery_size(tdb);
669
670         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
671                 /* it fits in the existing area */
672                 *recovery_max_size = rec.max_len;
673                 *recovery_offset = recovery_head;
674                 return 0;
675         }
676
677         /* we need to free up the old recovery area, then allocate a
678            new one at the end of the file. Note that we cannot use
679            normal allocation to allocate the new one as that might return
680            us an area that is being currently used (as of the start of
681            the transaction) */
682         if (recovery_head != 0) {
683                 add_stat(tdb, frees, 1);
684                 ecode = add_free_record(tdb, recovery_head,
685                                         sizeof(rec) + rec.max_len);
686                 if (ecode != TDB_SUCCESS) {
687                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
688                                    "tdb_recovery_allocate:"
689                                    " failed to free previous recovery area");
690                         return -1;
691                 }
692         }
693
694         /* the tdb_free() call might have increased the recovery size */
695         *recovery_size = tdb_recovery_size(tdb);
696
697         /* round up to a multiple of page size */
698         *recovery_max_size
699                 = (((sizeof(rec) + *recovery_size) + getpagesize()-1)
700                    & ~(getpagesize()-1))
701                 - sizeof(rec);
702         *recovery_offset = tdb->map_size;
703         recovery_head = *recovery_offset;
704
705         /* Restore ->map_size before calling underlying expand_file.
706            Also so that we don't try to expand the file again in the
707            transaction commit, which would destroy the recovery
708            area */
709         addition = (tdb->map_size - tdb->transaction->old_map_size) +
710                 sizeof(rec) + *recovery_max_size;
711         tdb->map_size = tdb->transaction->old_map_size;
712         ecode = methods->expand_file(tdb, addition);
713         if (ecode != TDB_SUCCESS) {
714                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
715                          "tdb_recovery_allocate:"
716                          " failed to create recovery area");
717                 return -1;
718         }
719
720         /* we have to reset the old map size so that we don't try to
721            expand the file again in the transaction commit, which
722            would destroy the recovery area */
723         tdb->transaction->old_map_size = tdb->map_size;
724
725         /* write the recovery header offset and sync - we can sync without a race here
726            as the magic ptr in the recovery record has not been set */
727         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
728         ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery),
729                                 &recovery_head, sizeof(tdb_off_t));
730         if (ecode != TDB_SUCCESS) {
731                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
732                          "tdb_recovery_allocate:"
733                          " failed to write recovery head");
734                 return -1;
735         }
736         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
737                                    &recovery_head,
738                                    sizeof(tdb_off_t));
739         return 0;
740 }
741
742 /* Set up header for the recovery record. */
743 static void set_recovery_header(struct tdb_recovery_record *rec,
744                                 uint64_t magic,
745                                 uint64_t datalen, uint64_t actuallen,
746                                 uint64_t oldsize)
747 {
748         rec->magic = magic;
749         rec->max_len = actuallen;
750         rec->len = datalen;
751         rec->eof = oldsize;
752 }
753
754 /*
755   setup the recovery data that will be used on a crash during commit
756 */
757 static int transaction_setup_recovery(struct tdb_context *tdb,
758                                       tdb_off_t *magic_offset)
759 {
760         tdb_len_t recovery_size;
761         unsigned char *data, *p;
762         const struct tdb_methods *methods = tdb->transaction->io_methods;
763         struct tdb_recovery_record *rec;
764         tdb_off_t recovery_offset, recovery_max_size;
765         tdb_off_t old_map_size = tdb->transaction->old_map_size;
766         uint64_t magic, tailer;
767         int i;
768         enum TDB_ERROR ecode;
769
770         /*
771           check that the recovery area has enough space
772         */
773         if (tdb_recovery_allocate(tdb, &recovery_size,
774                                   &recovery_offset, &recovery_max_size) == -1) {
775                 return -1;
776         }
777
778         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
779         if (data == NULL) {
780                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
781                            "transaction_setup_recovery: cannot allocate");
782                 return -1;
783         }
784
785         rec = (struct tdb_recovery_record *)data;
786         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
787                             recovery_size, recovery_max_size, old_map_size);
788         tdb_convert(tdb, rec, sizeof(*rec));
789
790         /* build the recovery data into a single blob to allow us to do a single
791            large write, which should be more efficient */
792         p = data + sizeof(*rec);
793         for (i=0;i<tdb->transaction->num_blocks;i++) {
794                 tdb_off_t offset;
795                 tdb_len_t length;
796
797                 if (tdb->transaction->blocks[i] == NULL) {
798                         continue;
799                 }
800
801                 offset = i * getpagesize();
802                 length = getpagesize();
803                 if (i == tdb->transaction->num_blocks-1) {
804                         length = tdb->transaction->last_block_size;
805                 }
806
807                 if (offset >= old_map_size) {
808                         continue;
809                 }
810                 if (offset + length > tdb->map_size) {
811                         tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
812                                    "tdb_transaction_setup_recovery:"
813                                    " transaction data over new region boundary");
814                         free(data);
815                         return -1;
816                 }
817                 memcpy(p, &offset, sizeof(offset));
818                 memcpy(p + sizeof(offset), &length, sizeof(length));
819                 tdb_convert(tdb, p, sizeof(offset) + sizeof(length));
820
821                 /* the recovery area contains the old data, not the
822                    new data, so we have to call the original tdb_read
823                    method to get it */
824                 ecode = methods->tread(tdb, offset,
825                                        p + sizeof(offset) + sizeof(length),
826                                        length);
827                 if (ecode != TDB_SUCCESS) {
828                         tdb->ecode = ecode;
829                         free(data);
830                         return -1;
831                 }
832                 p += sizeof(offset) + sizeof(length) + length;
833         }
834
835         /* and the tailer */
836         tailer = sizeof(*rec) + recovery_max_size;
837         memcpy(p, &tailer, sizeof(tailer));
838         tdb_convert(tdb, p, sizeof(tailer));
839
840         /* write the recovery data to the recovery area */
841         ecode = methods->twrite(tdb, recovery_offset, data,
842                                 sizeof(*rec) + recovery_size);
843         if (ecode != TDB_SUCCESS) {
844                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
845                          "tdb_transaction_setup_recovery:"
846                          " failed to write recovery data");
847                 free(data);
848                 return -1;
849         }
850         transaction_write_existing(tdb, recovery_offset, data,
851                                    sizeof(*rec) + recovery_size);
852
853         /* as we don't have ordered writes, we have to sync the recovery
854            data before we update the magic to indicate that the recovery
855            data is present */
856         ecode = transaction_sync(tdb, recovery_offset,
857                                  sizeof(*rec) + recovery_size);
858         if (ecode != TDB_SUCCESS) {
859                 free(data);
860                 tdb->ecode = ecode;
861                 return -1;
862         }
863
864         free(data);
865
866         magic = TDB_RECOVERY_MAGIC;
867         tdb_convert(tdb, &magic, sizeof(magic));
868
869         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
870                                                    magic);
871
872         ecode = methods->twrite(tdb, *magic_offset, &magic, sizeof(magic));
873         if (ecode != TDB_SUCCESS) {
874                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
875                          "tdb_transaction_setup_recovery:"
876                          " failed to write recovery magic");
877                 return -1;
878         }
879         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
880
881         /* ensure the recovery magic marker is on disk */
882         ecode = transaction_sync(tdb, *magic_offset, sizeof(magic));
883         if (ecode != TDB_SUCCESS) {
884                 tdb->ecode = ecode;
885                 return -1;
886         }
887
888         return 0;
889 }
890
891 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
892 {
893         const struct tdb_methods *methods;
894         enum TDB_ERROR ecode;
895
896         if (tdb->transaction == NULL) {
897                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
898                            "tdb_transaction_prepare_commit: no transaction");
899                 return -1;
900         }
901
902         if (tdb->transaction->prepared) {
903                 _tdb_transaction_cancel(tdb);
904                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
905                            "tdb_transaction_prepare_commit:"
906                            " transaction already prepared");
907                 return -1;
908         }
909
910         if (tdb->transaction->transaction_error) {
911                 _tdb_transaction_cancel(tdb);
912                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
913                            "tdb_transaction_prepare_commit:"
914                            " transaction error pending");
915                 return -1;
916         }
917
918
919         if (tdb->transaction->nesting != 0) {
920                 tdb->transaction->nesting--;
921                 return 0;
922         }
923
924         /* check for a null transaction */
925         if (tdb->transaction->blocks == NULL) {
926                 return 0;
927         }
928
929         methods = tdb->transaction->io_methods;
930
931         /* upgrade the main transaction lock region to a write lock */
932         ecode = tdb_allrecord_upgrade(tdb);
933         if (ecode != TDB_SUCCESS) {
934                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
935                          "tdb_transaction_prepare_commit:"
936                          " failed to upgrade hash locks");
937                 _tdb_transaction_cancel(tdb);
938                 return -1;
939         }
940
941         /* get the open lock - this prevents new users attaching to the database
942            during the commit */
943         ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
944         if (ecode != TDB_SUCCESS) {
945                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
946                            "tdb_transaction_prepare_commit:"
947                            " failed to get open lock");
948                 _tdb_transaction_cancel(tdb);
949                 return -1;
950         }
951
952         /* Since we have whole db locked, we don't need the expansion lock. */
953         if (!(tdb->flags & TDB_NOSYNC)) {
954                 /* write the recovery data to the end of the file */
955                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
956                         tdb_logerr(tdb, tdb->ecode, TDB_LOG_ERROR,
957                                  "tdb_transaction_prepare_commit:"
958                                  " failed to setup recovery data");
959                         _tdb_transaction_cancel(tdb);
960                         return -1;
961                 }
962         }
963
964         tdb->transaction->prepared = true;
965
966         /* expand the file to the new size if needed */
967         if (tdb->map_size != tdb->transaction->old_map_size) {
968                 tdb_len_t add = tdb->map_size - tdb->transaction->old_map_size;
969                 /* Restore original map size for tdb_expand_file */
970                 tdb->map_size = tdb->transaction->old_map_size;
971                 ecode = methods->expand_file(tdb, add);
972                 if (ecode != TDB_SUCCESS) {
973                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
974                                  "tdb_transaction_prepare_commit:"
975                                  " expansion failed");
976                         _tdb_transaction_cancel(tdb);
977                         return -1;
978                 }
979         }
980
981         /* Keep the open lock until the actual commit */
982
983         return 0;
984 }
985
986 /*
987    prepare to commit the current transaction
988 */
989 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
990 {
991         return _tdb_transaction_prepare_commit(tdb);
992 }
993
994 /*
995   commit the current transaction
996 */
997 int tdb_transaction_commit(struct tdb_context *tdb)
998 {
999         const struct tdb_methods *methods;
1000         int i;
1001         enum TDB_ERROR ecode;
1002
1003         if (tdb->transaction == NULL) {
1004                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
1005                          "tdb_transaction_commit: no transaction");
1006                 return -1;
1007         }
1008
1009         tdb_trace(tdb, "tdb_transaction_commit");
1010
1011         if (tdb->transaction->nesting != 0) {
1012                 tdb->transaction->nesting--;
1013                 return 0;
1014         }
1015
1016         /* check for a null transaction */
1017         if (tdb->transaction->blocks == NULL) {
1018                 _tdb_transaction_cancel(tdb);
1019                 return 0;
1020         }
1021
1022         if (!tdb->transaction->prepared) {
1023                 int ret = _tdb_transaction_prepare_commit(tdb);
1024                 if (ret)
1025                         return ret;
1026         }
1027
1028         methods = tdb->transaction->io_methods;
1029
1030         /* perform all the writes */
1031         for (i=0;i<tdb->transaction->num_blocks;i++) {
1032                 tdb_off_t offset;
1033                 tdb_len_t length;
1034
1035                 if (tdb->transaction->blocks[i] == NULL) {
1036                         continue;
1037                 }
1038
1039                 offset = i * getpagesize();
1040                 length = getpagesize();
1041                 if (i == tdb->transaction->num_blocks-1) {
1042                         length = tdb->transaction->last_block_size;
1043                 }
1044
1045                 ecode = methods->twrite(tdb, offset,
1046                                         tdb->transaction->blocks[i], length);
1047                 if (ecode != TDB_SUCCESS) {
1048                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1049                                    "tdb_transaction_commit:"
1050                                    " write failed during commit");
1051
1052                         /* we've overwritten part of the data and
1053                            possibly expanded the file, so we need to
1054                            run the crash recovery code */
1055                         tdb->methods = methods;
1056                         tdb_transaction_recover(tdb);
1057
1058                         _tdb_transaction_cancel(tdb);
1059
1060                         return -1;
1061                 }
1062                 SAFE_FREE(tdb->transaction->blocks[i]);
1063         }
1064
1065         SAFE_FREE(tdb->transaction->blocks);
1066         tdb->transaction->num_blocks = 0;
1067
1068         /* ensure the new data is on disk */
1069         ecode = transaction_sync(tdb, 0, tdb->map_size);
1070         if (ecode != TDB_SUCCESS) {
1071                 tdb->ecode = ecode;
1072                 return -1;
1073         }
1074
1075         /*
1076           TODO: maybe write to some dummy hdr field, or write to magic
1077           offset without mmap, before the last sync, instead of the
1078           utime() call
1079         */
1080
1081         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1082            don't change the mtime of the file, this means the file may
1083            not be backed up (as tdb rounding to block sizes means that
1084            file size changes are quite rare too). The following forces
1085            mtime changes when a transaction completes */
1086 #if HAVE_UTIME
1087         utime(tdb->name, NULL);
1088 #endif
1089
1090         /* use a transaction cancel to free memory and remove the
1091            transaction locks */
1092         _tdb_transaction_cancel(tdb);
1093
1094         return 0;
1095 }
1096
1097
1098 /*
1099   recover from an aborted transaction. Must be called with exclusive
1100   database write access already established (including the open
1101   lock to prevent new processes attaching)
1102 */
1103 int tdb_transaction_recover(struct tdb_context *tdb)
1104 {
1105         tdb_off_t recovery_head, recovery_eof;
1106         unsigned char *data, *p;
1107         struct tdb_recovery_record rec;
1108         enum TDB_ERROR ecode;
1109
1110         /* find the recovery area */
1111         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1112         if (TDB_OFF_IS_ERR(recovery_head)) {
1113                 tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
1114                          "tdb_transaction_recover:"
1115                          " failed to read recovery head");
1116                 return -1;
1117         }
1118
1119         if (recovery_head == 0) {
1120                 /* we have never allocated a recovery record */
1121                 return 0;
1122         }
1123
1124         /* read the recovery record */
1125         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1126         if (ecode != TDB_SUCCESS) {
1127                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1128                            "tdb_transaction_recover:"
1129                            " failed to read recovery record");
1130                 return -1;
1131         }
1132
1133         if (rec.magic != TDB_RECOVERY_MAGIC) {
1134                 /* there is no valid recovery data */
1135                 return 0;
1136         }
1137
1138         if (tdb->read_only) {
1139                 tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1140                            "tdb_transaction_recover:"
1141                            " attempt to recover read only database");
1142                 return -1;
1143         }
1144
1145         recovery_eof = rec.eof;
1146
1147         data = (unsigned char *)malloc(rec.len);
1148         if (data == NULL) {
1149                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1150                            "tdb_transaction_recover:"
1151                            " failed to allocate recovery data");
1152                 return -1;
1153         }
1154
1155         /* read the full recovery data */
1156         ecode = tdb->methods->tread(tdb, recovery_head + sizeof(rec), data,
1157                                     rec.len);
1158         if (ecode != TDB_SUCCESS) {
1159                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1160                            "tdb_transaction_recover:"
1161                            " failed to read recovery data");
1162                 return -1;
1163         }
1164
1165         /* recover the file data */
1166         p = data;
1167         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1168                 tdb_off_t ofs;
1169                 tdb_len_t len;
1170                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1171                 memcpy(&ofs, p, sizeof(ofs));
1172                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1173                 p += sizeof(ofs) + sizeof(len);
1174
1175                 ecode = tdb->methods->twrite(tdb, ofs, p, len);
1176                 if (ecode != TDB_SUCCESS) {
1177                         free(data);
1178                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1179                                  "tdb_transaction_recover:"
1180                                  " failed to recover %zu bytes at offset %zu",
1181                                  (size_t)len, (size_t)ofs);
1182                         return -1;
1183                 }
1184                 p += len;
1185         }
1186
1187         free(data);
1188
1189         ecode = transaction_sync(tdb, 0, tdb->map_size);
1190         if (ecode != TDB_SUCCESS) {
1191                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1192                            "tdb_transaction_recover: failed to sync recovery");
1193                 return -1;
1194         }
1195
1196         /* if the recovery area is after the recovered eof then remove it */
1197         if (recovery_eof <= recovery_head) {
1198                 ecode = tdb_write_off(tdb, offsetof(struct tdb_header,
1199                                                     recovery),
1200                                       0);
1201                 if (ecode != TDB_SUCCESS) {
1202                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1203                                  "tdb_transaction_recover:"
1204                                  " failed to remove recovery head");
1205                         return -1;
1206                 }
1207         }
1208
1209         /* remove the recovery magic */
1210         ecode = tdb_write_off(tdb,
1211                               recovery_head
1212                               + offsetof(struct tdb_recovery_record, magic),
1213                               TDB_RECOVERY_INVALID_MAGIC);
1214         if (ecode != TDB_SUCCESS) {
1215                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1216                          "tdb_transaction_recover:"
1217                          " failed to remove recovery magic");
1218                 return -1;
1219         }
1220
1221         ecode = transaction_sync(tdb, 0, recovery_eof);
1222         if (ecode != TDB_SUCCESS) {
1223                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1224                          "tdb_transaction_recover: failed to sync2 recovery");
1225                 return -1;
1226         }
1227
1228         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1229                    "tdb_transaction_recover: recovered %zu byte database",
1230                    (size_t)recovery_eof);
1231
1232         /* all done */
1233         return 0;
1234 }
1235
1236 /* Any I/O failures we say "needs recovery". */
1237 bool tdb_needs_recovery(struct tdb_context *tdb)
1238 {
1239         tdb_off_t recovery_head;
1240         struct tdb_recovery_record rec;
1241         enum TDB_ERROR ecode;
1242
1243         /* find the recovery area */
1244         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1245         if (TDB_OFF_IS_ERR(recovery_head)) {
1246                 tdb->ecode = recovery_head;
1247                 return true;
1248         }
1249
1250         if (recovery_head == 0) {
1251                 /* we have never allocated a recovery record */
1252                 return false;
1253         }
1254
1255         /* read the recovery record */
1256         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1257         if (ecode != TDB_SUCCESS) {
1258                 tdb->ecode = ecode;
1259                 return true;
1260         }
1261
1262         return (rec.magic == TDB_RECOVERY_MAGIC);
1263 }