]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
4bdc3f32d019b6abe6e9519be298b027a7b804e6
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in POSIX locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is canceled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or canceled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         unsigned int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
124 #define PAGESIZE 4096
125
126 /*
127   read while in a transaction. We need to check first if the data is in our list
128   of transaction elements, then if not do a real read
129 */
130 static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off,
131                                        void *buf, tdb_len_t len)
132 {
133         size_t blk;
134         enum TDB_ERROR ecode;
135
136         /* break it down into block sized ops */
137         while (len + (off % PAGESIZE) > PAGESIZE) {
138                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
139                 ecode = transaction_read(tdb, off, buf, len2);
140                 if (ecode != TDB_SUCCESS) {
141                         return ecode;
142                 }
143                 len -= len2;
144                 off += len2;
145                 buf = (void *)(len2 + (char *)buf);
146         }
147
148         if (len == 0) {
149                 return TDB_SUCCESS;
150         }
151
152         blk = off / PAGESIZE;
153
154         /* see if we have it in the block list */
155         if (tdb->transaction->num_blocks <= blk ||
156             tdb->transaction->blocks[blk] == NULL) {
157                 /* nope, do a real read */
158                 ecode = tdb->transaction->io_methods->tread(tdb, off, buf, len);
159                 if (ecode != TDB_SUCCESS) {
160                         goto fail;
161                 }
162                 return 0;
163         }
164
165         /* it is in the block list. Now check for the last block */
166         if (blk == tdb->transaction->num_blocks-1) {
167                 if (len > tdb->transaction->last_block_size) {
168                         ecode = TDB_ERR_IO;
169                         goto fail;
170                 }
171         }
172
173         /* now copy it out of this block */
174         memcpy(buf, tdb->transaction->blocks[blk] + (off % PAGESIZE), len);
175         return TDB_SUCCESS;
176
177 fail:
178         tdb->transaction->transaction_error = 1;
179         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
180                           "transaction_read: failed at off=%zu len=%zu",
181                           (size_t)off, (size_t)len);
182 }
183
184
185 /*
186   write while in a transaction
187 */
188 static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off,
189                                         const void *buf, tdb_len_t len)
190 {
191         size_t blk;
192         enum TDB_ERROR ecode;
193
194         /* Only a commit is allowed on a prepared transaction */
195         if (tdb->transaction->prepared) {
196                 ecode = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
197                                    "transaction_write: transaction already"
198                                    " prepared, write not allowed");
199                 goto fail;
200         }
201
202         /* break it up into block sized chunks */
203         while (len + (off % PAGESIZE) > PAGESIZE) {
204                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
205                 ecode = transaction_write(tdb, off, buf, len2);
206                 if (ecode != TDB_SUCCESS) {
207                         return -1;
208                 }
209                 len -= len2;
210                 off += len2;
211                 if (buf != NULL) {
212                         buf = (const void *)(len2 + (const char *)buf);
213                 }
214         }
215
216         if (len == 0) {
217                 return TDB_SUCCESS;
218         }
219
220         blk = off / PAGESIZE;
221         off = off % PAGESIZE;
222
223         if (tdb->transaction->num_blocks <= blk) {
224                 uint8_t **new_blocks;
225                 /* expand the blocks array */
226                 if (tdb->transaction->blocks == NULL) {
227                         new_blocks = (uint8_t **)malloc(
228                                 (blk+1)*sizeof(uint8_t *));
229                 } else {
230                         new_blocks = (uint8_t **)realloc(
231                                 tdb->transaction->blocks,
232                                 (blk+1)*sizeof(uint8_t *));
233                 }
234                 if (new_blocks == NULL) {
235                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
236                                            "transaction_write:"
237                                            " failed to allocate");
238                         goto fail;
239                 }
240                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
241                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
242                 tdb->transaction->blocks = new_blocks;
243                 tdb->transaction->num_blocks = blk+1;
244                 tdb->transaction->last_block_size = 0;
245         }
246
247         /* allocate and fill a block? */
248         if (tdb->transaction->blocks[blk] == NULL) {
249                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1);
250                 if (tdb->transaction->blocks[blk] == NULL) {
251                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
252                                            "transaction_write:"
253                                            " failed to allocate");
254                         goto fail;
255                 }
256                 if (tdb->transaction->old_map_size > blk * PAGESIZE) {
257                         tdb_len_t len2 = PAGESIZE;
258                         if (len2 + (blk * PAGESIZE) > tdb->transaction->old_map_size) {
259                                 len2 = tdb->transaction->old_map_size - (blk * PAGESIZE);
260                         }
261                         ecode = tdb->transaction->io_methods->tread(tdb,
262                                         blk * PAGESIZE,
263                                         tdb->transaction->blocks[blk],
264                                         len2);
265                         if (ecode != TDB_SUCCESS) {
266                                 ecode = tdb_logerr(tdb, ecode,
267                                                    TDB_LOG_ERROR,
268                                                    "transaction_write:"
269                                                    " failed to"
270                                                    " read old block: %s",
271                                                    strerror(errno));
272                                 SAFE_FREE(tdb->transaction->blocks[blk]);
273                                 goto fail;
274                         }
275                         if (blk == tdb->transaction->num_blocks-1) {
276                                 tdb->transaction->last_block_size = len2;
277                         }
278                 }
279         }
280
281         /* overwrite part of an existing block */
282         if (buf == NULL) {
283                 memset(tdb->transaction->blocks[blk] + off, 0, len);
284         } else {
285                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
286         }
287         if (blk == tdb->transaction->num_blocks-1) {
288                 if (len + off > tdb->transaction->last_block_size) {
289                         tdb->transaction->last_block_size = len + off;
290                 }
291         }
292
293         return TDB_SUCCESS;
294
295 fail:
296         tdb->transaction->transaction_error = 1;
297         return ecode;
298 }
299
300
301 /*
302   write while in a transaction - this variant never expands the transaction blocks, it only
303   updates existing blocks. This means it cannot change the recovery size
304 */
305 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
306                                        const void *buf, tdb_len_t len)
307 {
308         size_t blk;
309
310         /* break it up into block sized chunks */
311         while (len + (off % PAGESIZE) > PAGESIZE) {
312                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
313                 transaction_write_existing(tdb, off, buf, len2);
314                 len -= len2;
315                 off += len2;
316                 if (buf != NULL) {
317                         buf = (const void *)(len2 + (const char *)buf);
318                 }
319         }
320
321         if (len == 0) {
322                 return;
323         }
324
325         blk = off / PAGESIZE;
326         off = off % PAGESIZE;
327
328         if (tdb->transaction->num_blocks <= blk ||
329             tdb->transaction->blocks[blk] == NULL) {
330                 return;
331         }
332
333         if (blk == tdb->transaction->num_blocks-1 &&
334             off + len > tdb->transaction->last_block_size) {
335                 if (off >= tdb->transaction->last_block_size) {
336                         return;
337                 }
338                 len = tdb->transaction->last_block_size - off;
339         }
340
341         /* overwrite part of an existing block */
342         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
343 }
344
345
346 /*
347   out of bounds check during a transaction
348 */
349 static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len,
350                                       bool probe)
351 {
352         if (len <= tdb->file->map_size) {
353                 return TDB_SUCCESS;
354         }
355         if (!probe) {
356                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
357                            "tdb_oob len %lld beyond transaction size %lld",
358                            (long long)len,
359                            (long long)tdb->file->map_size);
360         }
361         return TDB_ERR_IO;
362 }
363
364 /*
365   transaction version of tdb_expand().
366 */
367 static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb,
368                                               tdb_off_t addition)
369 {
370         enum TDB_ERROR ecode;
371
372         /* add a write to the transaction elements, so subsequent
373            reads see the zero data */
374         ecode = transaction_write(tdb, tdb->file->map_size, NULL, addition);
375         if (ecode == TDB_SUCCESS) {
376                 tdb->file->map_size += addition;
377         }
378         return ecode;
379 }
380
381 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
382                                 size_t len, bool write_mode)
383 {
384         size_t blk = off / PAGESIZE, end_blk;
385
386         /* This is wrong for zero-length blocks, but will fail gracefully */
387         end_blk = (off + len - 1) / PAGESIZE;
388
389         /* Can only do direct if in single block and we've already copied. */
390         if (write_mode) {
391                 if (blk != end_blk)
392                         return NULL;
393                 if (blk >= tdb->transaction->num_blocks)
394                         return NULL;
395                 if (tdb->transaction->blocks[blk] == NULL)
396                         return NULL;
397                 return tdb->transaction->blocks[blk] + off % PAGESIZE;
398         }
399
400         /* Single which we have copied? */
401         if (blk == end_blk
402             && blk < tdb->transaction->num_blocks
403             && tdb->transaction->blocks[blk])
404                 return tdb->transaction->blocks[blk] + off % PAGESIZE;
405
406         /* Otherwise must be all not copied. */
407         while (blk <= end_blk) {
408                 if (blk >= tdb->transaction->num_blocks)
409                         break;
410                 if (tdb->transaction->blocks[blk])
411                         return NULL;
412                 blk++;
413         }
414         return tdb->transaction->io_methods->direct(tdb, off, len, false);
415 }
416
417 static const struct tdb_methods transaction_methods = {
418         transaction_read,
419         transaction_write,
420         transaction_oob,
421         transaction_expand_file,
422         transaction_direct,
423 };
424
425 /*
426   sync to disk
427 */
428 static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
429                                        tdb_off_t offset, tdb_len_t length)
430 {
431         if (tdb->flags & TDB_NOSYNC) {
432                 return TDB_SUCCESS;
433         }
434
435         if (fsync(tdb->file->fd) != 0) {
436                 return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
437                                   "tdb_transaction: fsync failed: %s",
438                                   strerror(errno));
439         }
440 #ifdef MS_SYNC
441         if (tdb->file->map_ptr) {
442                 tdb_off_t moffset = offset & ~(PAGESIZE-1);
443                 if (msync(moffset + (char *)tdb->file->map_ptr,
444                           length + (offset - moffset), MS_SYNC) != 0) {
445                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
446                                           "tdb_transaction: msync failed: %s",
447                                           strerror(errno));
448                 }
449         }
450 #endif
451         return TDB_SUCCESS;
452 }
453
454
455 static void _tdb_transaction_cancel(struct tdb_context *tdb)
456 {
457         int i;
458         enum TDB_ERROR ecode;
459
460         if (tdb->transaction == NULL) {
461                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
462                            "tdb_transaction_cancel: no transaction");
463                 return;
464         }
465
466         if (tdb->transaction->nesting != 0) {
467                 tdb->transaction->transaction_error = 1;
468                 tdb->transaction->nesting--;
469                 return;
470         }
471
472         tdb->file->map_size = tdb->transaction->old_map_size;
473
474         /* free all the transaction blocks */
475         for (i=0;i<tdb->transaction->num_blocks;i++) {
476                 if (tdb->transaction->blocks[i] != NULL) {
477                         free(tdb->transaction->blocks[i]);
478                 }
479         }
480         SAFE_FREE(tdb->transaction->blocks);
481
482         if (tdb->transaction->magic_offset) {
483                 const struct tdb_methods *methods = tdb->transaction->io_methods;
484                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
485
486                 /* remove the recovery marker */
487                 ecode = methods->twrite(tdb, tdb->transaction->magic_offset,
488                                         &invalid, sizeof(invalid));
489                 if (ecode == TDB_SUCCESS)
490                         ecode = transaction_sync(tdb,
491                                                  tdb->transaction->magic_offset,
492                                                  sizeof(invalid));
493                 if (ecode != TDB_SUCCESS) {
494                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
495                                    "tdb_transaction_cancel: failed to remove"
496                                    " recovery magic");
497                 }
498         }
499
500         if (tdb->file->allrecord_lock.count)
501                 tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
502
503         /* restore the normal io methods */
504         tdb->methods = tdb->transaction->io_methods;
505
506         tdb_transaction_unlock(tdb, F_WRLCK);
507
508         if (tdb_has_open_lock(tdb))
509                 tdb_unlock_open(tdb);
510
511         SAFE_FREE(tdb->transaction);
512 }
513
514 /*
515   start a tdb transaction. No token is returned, as only a single
516   transaction is allowed to be pending per tdb_context
517 */
518 enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
519 {
520         enum TDB_ERROR ecode;
521
522         /* some sanity checks */
523         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
524                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
525                                                     TDB_LOG_USE_ERROR,
526                                                     "tdb_transaction_start:"
527                                                     " cannot start a"
528                                                     " transaction on a "
529                                                     "read-only or internal db");
530         }
531
532         /* cope with nested tdb_transaction_start() calls */
533         if (tdb->transaction != NULL) {
534                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
535                         return tdb->last_error
536                                 = tdb_logerr(tdb, TDB_ERR_IO,
537                                              TDB_LOG_USE_ERROR,
538                                              "tdb_transaction_start:"
539                                              " already inside transaction");
540                 }
541                 tdb->transaction->nesting++;
542                 return 0;
543         }
544
545         if (tdb_has_hash_locks(tdb)) {
546                 /* the caller must not have any locks when starting a
547                    transaction as otherwise we'll be screwed by lack
548                    of nested locks in POSIX */
549                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK,
550                                                     TDB_LOG_USE_ERROR,
551                                                     "tdb_transaction_start:"
552                                                     " cannot start a"
553                                                     " transaction with locks"
554                                                     " held");
555         }
556
557         tdb->transaction = (struct tdb_transaction *)
558                 calloc(sizeof(struct tdb_transaction), 1);
559         if (tdb->transaction == NULL) {
560                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM,
561                                                     TDB_LOG_ERROR,
562                                                     "tdb_transaction_start:"
563                                                     " cannot allocate");
564         }
565
566         /* get the transaction write lock. This is a blocking lock. As
567            discussed with Volker, there are a number of ways we could
568            make this async, which we will probably do in the future */
569         ecode = tdb_transaction_lock(tdb, F_WRLCK);
570         if (ecode != TDB_SUCCESS) {
571                 SAFE_FREE(tdb->transaction->blocks);
572                 SAFE_FREE(tdb->transaction);
573                 return tdb->last_error = ecode;
574         }
575
576         /* get a read lock over entire file. This is upgraded to a write
577            lock during the commit */
578         ecode = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true);
579         if (ecode != TDB_SUCCESS) {
580                 goto fail_allrecord_lock;
581         }
582
583         /* make sure we know about any file expansions already done by
584            anyone else */
585         tdb->methods->oob(tdb, tdb->file->map_size + 1, true);
586         tdb->transaction->old_map_size = tdb->file->map_size;
587
588         /* finally hook the io methods, replacing them with
589            transaction specific methods */
590         tdb->transaction->io_methods = tdb->methods;
591         tdb->methods = &transaction_methods;
592         return tdb->last_error = TDB_SUCCESS;
593
594 fail_allrecord_lock:
595         tdb_transaction_unlock(tdb, F_WRLCK);
596         SAFE_FREE(tdb->transaction->blocks);
597         SAFE_FREE(tdb->transaction);
598         return tdb->last_error = ecode;
599 }
600
601
602 /*
603   cancel the current transaction
604 */
605 void tdb_transaction_cancel(struct tdb_context *tdb)
606 {
607         _tdb_transaction_cancel(tdb);
608 }
609
610 /*
611   work out how much space the linearised recovery data will consume
612 */
613 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
614 {
615         tdb_len_t recovery_size = 0;
616         int i;
617
618         recovery_size = 0;
619         for (i=0;i<tdb->transaction->num_blocks;i++) {
620                 if (i * PAGESIZE >= tdb->transaction->old_map_size) {
621                         break;
622                 }
623                 if (tdb->transaction->blocks[i] == NULL) {
624                         continue;
625                 }
626                 recovery_size += 2*sizeof(tdb_off_t);
627                 if (i == tdb->transaction->num_blocks-1) {
628                         recovery_size += tdb->transaction->last_block_size;
629                 } else {
630                         recovery_size += PAGESIZE;
631                 }
632         }
633
634         return recovery_size;
635 }
636
637 /*
638   allocate the recovery area, or use an existing recovery area if it is
639   large enough
640 */
641 static enum TDB_ERROR tdb_recovery_allocate(struct tdb_context *tdb,
642                                             tdb_len_t *recovery_size,
643                                             tdb_off_t *recovery_offset,
644                                             tdb_len_t *recovery_max_size)
645 {
646         struct tdb_recovery_record rec;
647         const struct tdb_methods *methods = tdb->transaction->io_methods;
648         tdb_off_t recovery_head;
649         size_t addition;
650         enum TDB_ERROR ecode;
651
652         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
653         if (TDB_OFF_IS_ERR(recovery_head)) {
654                 return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
655                                   "tdb_recovery_allocate:"
656                                   " failed to read recovery head");
657         }
658
659         if (recovery_head != 0) {
660                 ecode = methods->tread(tdb, recovery_head, &rec, sizeof(rec));
661                 if (ecode != TDB_SUCCESS) {
662                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
663                                           "tdb_recovery_allocate:"
664                                           " failed to read recovery record");
665                 }
666                 tdb_convert(tdb, &rec, sizeof(rec));
667                 /* ignore invalid recovery regions: can happen in crash */
668                 if (rec.magic != TDB_RECOVERY_MAGIC &&
669                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
670                         recovery_head = 0;
671                 }
672         }
673
674         *recovery_size = tdb_recovery_size(tdb);
675
676         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
677                 /* it fits in the existing area */
678                 *recovery_max_size = rec.max_len;
679                 *recovery_offset = recovery_head;
680                 return TDB_SUCCESS;
681         }
682
683         /* we need to free up the old recovery area, then allocate a
684            new one at the end of the file. Note that we cannot use
685            normal allocation to allocate the new one as that might return
686            us an area that is being currently used (as of the start of
687            the transaction) */
688         if (recovery_head != 0) {
689                 tdb->stats.frees++;
690                 ecode = add_free_record(tdb, recovery_head,
691                                         sizeof(rec) + rec.max_len,
692                                         TDB_LOCK_WAIT, true);
693                 if (ecode != TDB_SUCCESS) {
694                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
695                                           "tdb_recovery_allocate:"
696                                           " failed to free previous"
697                                           " recovery area");
698                 }
699         }
700
701         /* the tdb_free() call might have increased the recovery size */
702         *recovery_size = tdb_recovery_size(tdb);
703
704         /* round up to a multiple of page size. Overallocate, since each
705          * such allocation forces us to expand the file. */
706         *recovery_max_size
707                 = (((sizeof(rec) + *recovery_size + *recovery_size / 2)
708                     + PAGESIZE-1) & ~(PAGESIZE-1))
709                 - sizeof(rec);
710         *recovery_offset = tdb->file->map_size;
711         recovery_head = *recovery_offset;
712
713         /* Restore ->map_size before calling underlying expand_file.
714            Also so that we don't try to expand the file again in the
715            transaction commit, which would destroy the recovery
716            area */
717         addition = (tdb->file->map_size - tdb->transaction->old_map_size) +
718                 sizeof(rec) + *recovery_max_size;
719         tdb->file->map_size = tdb->transaction->old_map_size;
720         ecode = methods->expand_file(tdb, addition);
721         if (ecode != TDB_SUCCESS) {
722                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
723                                   "tdb_recovery_allocate:"
724                                   " failed to create recovery area");
725         }
726
727         /* we have to reset the old map size so that we don't try to
728            expand the file again in the transaction commit, which
729            would destroy the recovery area */
730         tdb->transaction->old_map_size = tdb->file->map_size;
731
732         /* write the recovery header offset and sync - we can sync without a race here
733            as the magic ptr in the recovery record has not been set */
734         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
735         ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery),
736                                 &recovery_head, sizeof(tdb_off_t));
737         if (ecode != TDB_SUCCESS) {
738                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
739                                   "tdb_recovery_allocate:"
740                                   " failed to write recovery head");
741         }
742         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
743                                    &recovery_head,
744                                    sizeof(tdb_off_t));
745         return TDB_SUCCESS;
746 }
747
748 /* Set up header for the recovery record. */
749 static void set_recovery_header(struct tdb_recovery_record *rec,
750                                 uint64_t magic,
751                                 uint64_t datalen, uint64_t actuallen,
752                                 uint64_t oldsize)
753 {
754         rec->magic = magic;
755         rec->max_len = actuallen;
756         rec->len = datalen;
757         rec->eof = oldsize;
758 }
759
760 static unsigned int same(const unsigned char *new,
761                          const unsigned char *old,
762                          unsigned int length)
763 {
764         unsigned int i;
765
766         for (i = 0; i < length; i++) {
767                 if (new[i] != old[i])
768                         break;
769         }
770         return i;
771 }
772
773 static unsigned int different(const unsigned char *new,
774                               const unsigned char *old,
775                               unsigned int length,
776                               unsigned int min_same,
777                               unsigned int *samelen)
778 {
779         unsigned int i;
780
781         *samelen = 0;
782         for (i = 0; i < length; i++) {
783                 if (new[i] == old[i]) {
784                         (*samelen)++;
785                 } else {
786                         if (*samelen >= min_same) {
787                                 return i - *samelen;
788                         }
789                         *samelen = 0;
790                 }
791         }
792
793         if (*samelen < min_same)
794                 *samelen = 0;
795         return length - *samelen;
796 }
797
798 /*
799   setup the recovery data that will be used on a crash during commit
800 */
801 static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb,
802                                                  tdb_off_t *magic_offset)
803 {
804         /* Initialized for GCC's 4.4.5 overzealous uninitialized warnings. */
805         tdb_len_t recovery_size = 0;
806         tdb_off_t recovery_offset = 0, recovery_max_size = 0;
807         unsigned char *data, *p;
808         const struct tdb_methods *methods = tdb->transaction->io_methods;
809         struct tdb_recovery_record *rec;
810         tdb_off_t old_map_size = tdb->transaction->old_map_size;
811         uint64_t magic;
812         int i;
813         enum TDB_ERROR ecode;
814
815         /*
816           check that the recovery area has enough space
817         */
818         ecode = tdb_recovery_allocate(tdb, &recovery_size,
819                                       &recovery_offset, &recovery_max_size);
820         if (ecode != TDB_SUCCESS) {
821                 return ecode;
822         }
823
824         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
825         if (data == NULL) {
826                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
827                                   "transaction_setup_recovery:"
828                                   " cannot allocate");
829         }
830
831         rec = (struct tdb_recovery_record *)data;
832
833         /* build the recovery data into a single blob to allow us to do a single
834            large write, which should be more efficient */
835         p = data + sizeof(*rec);
836         for (i=0;i<tdb->transaction->num_blocks;i++) {
837                 tdb_off_t offset;
838                 tdb_len_t length;
839                 unsigned int off;
840                 unsigned char buffer[PAGESIZE];
841
842                 if (tdb->transaction->blocks[i] == NULL) {
843                         continue;
844                 }
845
846                 offset = i * PAGESIZE;
847                 length = PAGESIZE;
848                 if (i == tdb->transaction->num_blocks-1) {
849                         length = tdb->transaction->last_block_size;
850                 }
851
852                 if (offset >= old_map_size) {
853                         continue;
854                 }
855
856                 if (offset + length > tdb->file->map_size) {
857                         free(data);
858                         return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
859                                           "tdb_transaction_setup_recovery:"
860                                           " transaction data over new region"
861                                           " boundary");
862                 }
863                 if (offset + length > old_map_size) {
864                         /* Short read at EOF. */
865                         length = old_map_size - offset;
866                 }
867                 ecode = methods->tread(tdb, offset, buffer, length);
868                 if (ecode != TDB_SUCCESS) {
869                         free(data);
870                         return ecode;
871                 }
872
873                 /* Skip over anything the same at the start. */
874                 off = same(tdb->transaction->blocks[i], buffer, length);
875                 offset += off;
876
877                 while (off < length) {
878                         tdb_len_t len;
879                         unsigned int samelen;
880
881                         len = different(tdb->transaction->blocks[i] + off,
882                                         buffer + off, length - off,
883                                         sizeof(offset) + sizeof(len) + 1,
884                                         &samelen);
885
886                         memcpy(p, &offset, sizeof(offset));
887                         memcpy(p + sizeof(offset), &len, sizeof(len));
888                         tdb_convert(tdb, p, sizeof(offset) + sizeof(len));
889                         p += sizeof(offset) + sizeof(len);
890                         memcpy(p, buffer + off, len);
891                         p += len;
892                         off += len + samelen;
893                         offset += len + samelen;
894                 }
895         }
896
897         /* Now we know size, set up rec header. */
898         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
899                             p - data - sizeof(*rec),
900                             recovery_max_size, old_map_size);
901         tdb_convert(tdb, rec, sizeof(*rec));
902
903         /* write the recovery data to the recovery area */
904         ecode = methods->twrite(tdb, recovery_offset, data, p - data);
905         if (ecode != TDB_SUCCESS) {
906                 free(data);
907                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
908                                   "tdb_transaction_setup_recovery:"
909                                   " failed to write recovery data");
910         }
911         transaction_write_existing(tdb, recovery_offset, data, p - data);
912
913         /* as we don't have ordered writes, we have to sync the recovery
914            data before we update the magic to indicate that the recovery
915            data is present */
916         ecode = transaction_sync(tdb, recovery_offset, p - data);
917         if (ecode != TDB_SUCCESS) {
918                 free(data);
919                 return ecode;
920         }
921
922         free(data);
923
924         magic = TDB_RECOVERY_MAGIC;
925         tdb_convert(tdb, &magic, sizeof(magic));
926
927         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
928                                                    magic);
929
930         ecode = methods->twrite(tdb, *magic_offset, &magic, sizeof(magic));
931         if (ecode != TDB_SUCCESS) {
932                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
933                                   "tdb_transaction_setup_recovery:"
934                                   " failed to write recovery magic");
935         }
936         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
937
938         /* ensure the recovery magic marker is on disk */
939         return transaction_sync(tdb, *magic_offset, sizeof(magic));
940 }
941
942 static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
943 {
944         const struct tdb_methods *methods;
945         enum TDB_ERROR ecode;
946
947         if (tdb->transaction == NULL) {
948                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
949                                   "tdb_transaction_prepare_commit:"
950                                   " no transaction");
951         }
952
953         if (tdb->transaction->prepared) {
954                 _tdb_transaction_cancel(tdb);
955                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
956                                   "tdb_transaction_prepare_commit:"
957                                   " transaction already prepared");
958         }
959
960         if (tdb->transaction->transaction_error) {
961                 _tdb_transaction_cancel(tdb);
962                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
963                                   "tdb_transaction_prepare_commit:"
964                                   " transaction error pending");
965         }
966
967
968         if (tdb->transaction->nesting != 0) {
969                 return TDB_SUCCESS;
970         }
971
972         /* check for a null transaction */
973         if (tdb->transaction->blocks == NULL) {
974                 return TDB_SUCCESS;
975         }
976
977         methods = tdb->transaction->io_methods;
978
979         /* upgrade the main transaction lock region to a write lock */
980         ecode = tdb_allrecord_upgrade(tdb);
981         if (ecode != TDB_SUCCESS) {
982                 return ecode;
983         }
984
985         /* get the open lock - this prevents new users attaching to the database
986            during the commit */
987         ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
988         if (ecode != TDB_SUCCESS) {
989                 return ecode;
990         }
991
992         /* Since we have whole db locked, we don't need the expansion lock. */
993         if (!(tdb->flags & TDB_NOSYNC)) {
994                 /* write the recovery data to the end of the file */
995                 ecode = transaction_setup_recovery(tdb,
996                                                    &tdb->transaction
997                                                    ->magic_offset);
998                 if (ecode != TDB_SUCCESS) {
999                         return ecode;
1000                 }
1001         }
1002
1003         tdb->transaction->prepared = true;
1004
1005         /* expand the file to the new size if needed */
1006         if (tdb->file->map_size != tdb->transaction->old_map_size) {
1007                 tdb_len_t add;
1008
1009                 add = tdb->file->map_size - tdb->transaction->old_map_size;
1010                 /* Restore original map size for tdb_expand_file */
1011                 tdb->file->map_size = tdb->transaction->old_map_size;
1012                 ecode = methods->expand_file(tdb, add);
1013                 if (ecode != TDB_SUCCESS) {
1014                         return ecode;
1015                 }
1016         }
1017
1018         /* Keep the open lock until the actual commit */
1019         return TDB_SUCCESS;
1020 }
1021
1022 /*
1023    prepare to commit the current transaction
1024 */
1025 enum TDB_ERROR tdb_transaction_prepare_commit(struct tdb_context *tdb)
1026 {
1027         return _tdb_transaction_prepare_commit(tdb);
1028 }
1029
1030 /*
1031   commit the current transaction
1032 */
1033 enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb)
1034 {
1035         const struct tdb_methods *methods;
1036         int i;
1037         enum TDB_ERROR ecode;
1038
1039         if (tdb->transaction == NULL) {
1040                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
1041                                                     TDB_LOG_USE_ERROR,
1042                                                     "tdb_transaction_commit:"
1043                                                     " no transaction");
1044         }
1045
1046         tdb_trace(tdb, "tdb_transaction_commit");
1047
1048         if (tdb->transaction->nesting != 0) {
1049                 tdb->transaction->nesting--;
1050                 return tdb->last_error = TDB_SUCCESS;
1051         }
1052
1053         /* check for a null transaction */
1054         if (tdb->transaction->blocks == NULL) {
1055                 _tdb_transaction_cancel(tdb);
1056                 return tdb->last_error = TDB_SUCCESS;
1057         }
1058
1059         if (!tdb->transaction->prepared) {
1060                 ecode = _tdb_transaction_prepare_commit(tdb);
1061                 if (ecode != TDB_SUCCESS) {
1062                         _tdb_transaction_cancel(tdb);
1063                         return tdb->last_error = ecode;
1064                 }
1065         }
1066
1067         methods = tdb->transaction->io_methods;
1068
1069         /* perform all the writes */
1070         for (i=0;i<tdb->transaction->num_blocks;i++) {
1071                 tdb_off_t offset;
1072                 tdb_len_t length;
1073
1074                 if (tdb->transaction->blocks[i] == NULL) {
1075                         continue;
1076                 }
1077
1078                 offset = i * PAGESIZE;
1079                 length = PAGESIZE;
1080                 if (i == tdb->transaction->num_blocks-1) {
1081                         length = tdb->transaction->last_block_size;
1082                 }
1083
1084                 ecode = methods->twrite(tdb, offset,
1085                                         tdb->transaction->blocks[i], length);
1086                 if (ecode != TDB_SUCCESS) {
1087                         /* we've overwritten part of the data and
1088                            possibly expanded the file, so we need to
1089                            run the crash recovery code */
1090                         tdb->methods = methods;
1091                         tdb_transaction_recover(tdb);
1092
1093                         _tdb_transaction_cancel(tdb);
1094
1095                         return tdb->last_error = ecode;
1096                 }
1097                 SAFE_FREE(tdb->transaction->blocks[i]);
1098         }
1099
1100         SAFE_FREE(tdb->transaction->blocks);
1101         tdb->transaction->num_blocks = 0;
1102
1103         /* ensure the new data is on disk */
1104         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1105         if (ecode != TDB_SUCCESS) {
1106                 return tdb->last_error = ecode;
1107         }
1108
1109         /*
1110           TODO: maybe write to some dummy hdr field, or write to magic
1111           offset without mmap, before the last sync, instead of the
1112           utime() call
1113         */
1114
1115         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1116            don't change the mtime of the file, this means the file may
1117            not be backed up (as tdb rounding to block sizes means that
1118            file size changes are quite rare too). The following forces
1119            mtime changes when a transaction completes */
1120 #if HAVE_UTIME
1121         utime(tdb->name, NULL);
1122 #endif
1123
1124         /* use a transaction cancel to free memory and remove the
1125            transaction locks: it "restores" map_size, too. */
1126         tdb->transaction->old_map_size = tdb->file->map_size;
1127         _tdb_transaction_cancel(tdb);
1128
1129         return tdb->last_error = TDB_SUCCESS;
1130 }
1131
1132
1133 /*
1134   recover from an aborted transaction. Must be called with exclusive
1135   database write access already established (including the open
1136   lock to prevent new processes attaching)
1137 */
1138 enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb)
1139 {
1140         tdb_off_t recovery_head, recovery_eof;
1141         unsigned char *data, *p;
1142         struct tdb_recovery_record rec;
1143         enum TDB_ERROR ecode;
1144
1145         /* find the recovery area */
1146         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1147         if (TDB_OFF_IS_ERR(recovery_head)) {
1148                 return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
1149                                   "tdb_transaction_recover:"
1150                                   " failed to read recovery head");
1151         }
1152
1153         if (recovery_head == 0) {
1154                 /* we have never allocated a recovery record */
1155                 return TDB_SUCCESS;
1156         }
1157
1158         /* read the recovery record */
1159         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1160         if (ecode != TDB_SUCCESS) {
1161                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1162                                   "tdb_transaction_recover:"
1163                                   " failed to read recovery record");
1164         }
1165
1166         if (rec.magic != TDB_RECOVERY_MAGIC) {
1167                 /* there is no valid recovery data */
1168                 return TDB_SUCCESS;
1169         }
1170
1171         if (tdb->read_only) {
1172                 return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1173                                   "tdb_transaction_recover:"
1174                                   " attempt to recover read only database");
1175         }
1176
1177         recovery_eof = rec.eof;
1178
1179         data = (unsigned char *)malloc(rec.len);
1180         if (data == NULL) {
1181                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1182                                   "tdb_transaction_recover:"
1183                                   " failed to allocate recovery data");
1184         }
1185
1186         /* read the full recovery data */
1187         ecode = tdb->methods->tread(tdb, recovery_head + sizeof(rec), data,
1188                                     rec.len);
1189         if (ecode != TDB_SUCCESS) {
1190                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1191                                   "tdb_transaction_recover:"
1192                                   " failed to read recovery data");
1193         }
1194
1195         /* recover the file data */
1196         p = data;
1197         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1198                 tdb_off_t ofs;
1199                 tdb_len_t len;
1200                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1201                 memcpy(&ofs, p, sizeof(ofs));
1202                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1203                 p += sizeof(ofs) + sizeof(len);
1204
1205                 ecode = tdb->methods->twrite(tdb, ofs, p, len);
1206                 if (ecode != TDB_SUCCESS) {
1207                         free(data);
1208                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1209                                           "tdb_transaction_recover:"
1210                                           " failed to recover %zu bytes"
1211                                           " at offset %zu",
1212                                           (size_t)len, (size_t)ofs);
1213                 }
1214                 p += len;
1215         }
1216
1217         free(data);
1218
1219         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1220         if (ecode != TDB_SUCCESS) {
1221                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1222                                   "tdb_transaction_recover:"
1223                                   " failed to sync recovery");
1224         }
1225
1226         /* if the recovery area is after the recovered eof then remove it */
1227         if (recovery_eof <= recovery_head) {
1228                 ecode = tdb_write_off(tdb, offsetof(struct tdb_header,
1229                                                     recovery),
1230                                       0);
1231                 if (ecode != TDB_SUCCESS) {
1232                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1233                                           "tdb_transaction_recover:"
1234                                           " failed to remove recovery head");
1235                 }
1236         }
1237
1238         /* remove the recovery magic */
1239         ecode = tdb_write_off(tdb,
1240                               recovery_head
1241                               + offsetof(struct tdb_recovery_record, magic),
1242                               TDB_RECOVERY_INVALID_MAGIC);
1243         if (ecode != TDB_SUCCESS) {
1244                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1245                                   "tdb_transaction_recover:"
1246                                   " failed to remove recovery magic");
1247         }
1248
1249         ecode = transaction_sync(tdb, 0, recovery_eof);
1250         if (ecode != TDB_SUCCESS) {
1251                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1252                                   "tdb_transaction_recover:"
1253                                   " failed to sync2 recovery");
1254         }
1255
1256         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1257                    "tdb_transaction_recover: recovered %zu byte database",
1258                    (size_t)recovery_eof);
1259
1260         /* all done */
1261         return TDB_SUCCESS;
1262 }
1263
1264 tdb_bool_err tdb_needs_recovery(struct tdb_context *tdb)
1265 {
1266         tdb_off_t recovery_head;
1267         struct tdb_recovery_record rec;
1268         enum TDB_ERROR ecode;
1269
1270         /* find the recovery area */
1271         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1272         if (TDB_OFF_IS_ERR(recovery_head)) {
1273                 return recovery_head;
1274         }
1275
1276         if (recovery_head == 0) {
1277                 /* we have never allocated a recovery record */
1278                 return false;
1279         }
1280
1281         /* read the recovery record */
1282         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1283         if (ecode != TDB_SUCCESS) {
1284                 return ecode;
1285         }
1286
1287         return (rec.magic == TDB_RECOVERY_MAGIC);
1288 }