2f588ef37ad1d8dcb8aed53a0adfdf0bf116d6f4
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in POSIX locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is canceled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or canceled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* the original io methods - used to do IOs to the real db */
96         const struct tdb_methods *io_methods;
97
98         /* the list of transaction blocks. When a block is first
99            written to, it gets created in this list */
100         uint8_t **blocks;
101         size_t num_blocks;
102         size_t last_block_size; /* number of valid bytes in the last block */
103
104         /* non-zero when an internal transaction error has
105            occurred. All write operations will then fail until the
106            transaction is ended */
107         int transaction_error;
108
109         /* when inside a transaction we need to keep track of any
110            nested tdb_transaction_start() calls, as these are allowed,
111            but don't create a new transaction */
112         unsigned int nesting;
113
114         /* set when a prepare has already occurred */
115         bool prepared;
116         tdb_off_t magic_offset;
117
118         /* old file size before transaction */
119         tdb_len_t old_map_size;
120 };
121
122 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
123 #define PAGESIZE 65536
124
125 /*
126   read while in a transaction. We need to check first if the data is in our list
127   of transaction elements, then if not do a real read
128 */
129 static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off,
130                                        void *buf, tdb_len_t len)
131 {
132         size_t blk;
133         enum TDB_ERROR ecode;
134
135         /* break it down into block sized ops */
136         while (len + (off % PAGESIZE) > PAGESIZE) {
137                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
138                 ecode = transaction_read(tdb, off, buf, len2);
139                 if (ecode != TDB_SUCCESS) {
140                         return ecode;
141                 }
142                 len -= len2;
143                 off += len2;
144                 buf = (void *)(len2 + (char *)buf);
145         }
146
147         if (len == 0) {
148                 return TDB_SUCCESS;
149         }
150
151         blk = off / PAGESIZE;
152
153         /* see if we have it in the block list */
154         if (tdb->transaction->num_blocks <= blk ||
155             tdb->transaction->blocks[blk] == NULL) {
156                 /* nope, do a real read */
157                 ecode = tdb->transaction->io_methods->tread(tdb, off, buf, len);
158                 if (ecode != TDB_SUCCESS) {
159                         goto fail;
160                 }
161                 return 0;
162         }
163
164         /* it is in the block list. Now check for the last block */
165         if (blk == tdb->transaction->num_blocks-1) {
166                 if (len > tdb->transaction->last_block_size) {
167                         ecode = TDB_ERR_IO;
168                         goto fail;
169                 }
170         }
171
172         /* now copy it out of this block */
173         memcpy(buf, tdb->transaction->blocks[blk] + (off % PAGESIZE), len);
174         return TDB_SUCCESS;
175
176 fail:
177         tdb->transaction->transaction_error = 1;
178         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
179                           "transaction_read: failed at off=%zu len=%zu",
180                           (size_t)off, (size_t)len);
181 }
182
183
184 /*
185   write while in a transaction
186 */
187 static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off,
188                                         const void *buf, tdb_len_t len)
189 {
190         size_t blk;
191         enum TDB_ERROR ecode;
192
193         /* Only a commit is allowed on a prepared transaction */
194         if (tdb->transaction->prepared) {
195                 ecode = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
196                                    "transaction_write: transaction already"
197                                    " prepared, write not allowed");
198                 goto fail;
199         }
200
201         /* break it up into block sized chunks */
202         while (len + (off % PAGESIZE) > PAGESIZE) {
203                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
204                 ecode = transaction_write(tdb, off, buf, len2);
205                 if (ecode != TDB_SUCCESS) {
206                         return -1;
207                 }
208                 len -= len2;
209                 off += len2;
210                 if (buf != NULL) {
211                         buf = (const void *)(len2 + (const char *)buf);
212                 }
213         }
214
215         if (len == 0) {
216                 return TDB_SUCCESS;
217         }
218
219         blk = off / PAGESIZE;
220         off = off % PAGESIZE;
221
222         if (tdb->transaction->num_blocks <= blk) {
223                 uint8_t **new_blocks;
224                 /* expand the blocks array */
225                 if (tdb->transaction->blocks == NULL) {
226                         new_blocks = (uint8_t **)malloc(
227                                 (blk+1)*sizeof(uint8_t *));
228                 } else {
229                         new_blocks = (uint8_t **)realloc(
230                                 tdb->transaction->blocks,
231                                 (blk+1)*sizeof(uint8_t *));
232                 }
233                 if (new_blocks == NULL) {
234                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
235                                            "transaction_write:"
236                                            " failed to allocate");
237                         goto fail;
238                 }
239                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
240                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
241                 tdb->transaction->blocks = new_blocks;
242                 tdb->transaction->num_blocks = blk+1;
243                 tdb->transaction->last_block_size = 0;
244         }
245
246         /* allocate and fill a block? */
247         if (tdb->transaction->blocks[blk] == NULL) {
248                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1);
249                 if (tdb->transaction->blocks[blk] == NULL) {
250                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
251                                            "transaction_write:"
252                                            " failed to allocate");
253                         goto fail;
254                 }
255                 if (tdb->transaction->old_map_size > blk * PAGESIZE) {
256                         tdb_len_t len2 = PAGESIZE;
257                         if (len2 + (blk * PAGESIZE) > tdb->transaction->old_map_size) {
258                                 len2 = tdb->transaction->old_map_size - (blk * PAGESIZE);
259                         }
260                         ecode = tdb->transaction->io_methods->tread(tdb,
261                                         blk * PAGESIZE,
262                                         tdb->transaction->blocks[blk],
263                                         len2);
264                         if (ecode != TDB_SUCCESS) {
265                                 ecode = tdb_logerr(tdb, ecode,
266                                                    TDB_LOG_ERROR,
267                                                    "transaction_write:"
268                                                    " failed to"
269                                                    " read old block: %s",
270                                                    strerror(errno));
271                                 SAFE_FREE(tdb->transaction->blocks[blk]);
272                                 goto fail;
273                         }
274                         if (blk == tdb->transaction->num_blocks-1) {
275                                 tdb->transaction->last_block_size = len2;
276                         }
277                 }
278         }
279
280         /* overwrite part of an existing block */
281         if (buf == NULL) {
282                 memset(tdb->transaction->blocks[blk] + off, 0, len);
283         } else {
284                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
285         }
286         if (blk == tdb->transaction->num_blocks-1) {
287                 if (len + off > tdb->transaction->last_block_size) {
288                         tdb->transaction->last_block_size = len + off;
289                 }
290         }
291
292         return TDB_SUCCESS;
293
294 fail:
295         tdb->transaction->transaction_error = 1;
296         return ecode;
297 }
298
299
300 /*
301   write while in a transaction - this variant never expands the transaction blocks, it only
302   updates existing blocks. This means it cannot change the recovery size
303 */
304 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
305                                        const void *buf, tdb_len_t len)
306 {
307         size_t blk;
308
309         /* break it up into block sized chunks */
310         while (len + (off % PAGESIZE) > PAGESIZE) {
311                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
312                 transaction_write_existing(tdb, off, buf, len2);
313                 len -= len2;
314                 off += len2;
315                 if (buf != NULL) {
316                         buf = (const void *)(len2 + (const char *)buf);
317                 }
318         }
319
320         if (len == 0) {
321                 return;
322         }
323
324         blk = off / PAGESIZE;
325         off = off % PAGESIZE;
326
327         if (tdb->transaction->num_blocks <= blk ||
328             tdb->transaction->blocks[blk] == NULL) {
329                 return;
330         }
331
332         if (blk == tdb->transaction->num_blocks-1 &&
333             off + len > tdb->transaction->last_block_size) {
334                 if (off >= tdb->transaction->last_block_size) {
335                         return;
336                 }
337                 len = tdb->transaction->last_block_size - off;
338         }
339
340         /* overwrite part of an existing block */
341         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
342 }
343
344
345 /*
346   out of bounds check during a transaction
347 */
348 static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len,
349                                       bool probe)
350 {
351         if (len <= tdb->file->map_size || probe) {
352                 return TDB_SUCCESS;
353         }
354
355         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
356                    "tdb_oob len %lld beyond transaction size %lld",
357                    (long long)len,
358                    (long long)tdb->file->map_size);
359         return TDB_ERR_IO;
360 }
361
362 /*
363   transaction version of tdb_expand().
364 */
365 static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb,
366                                               tdb_off_t addition)
367 {
368         enum TDB_ERROR ecode;
369
370         /* add a write to the transaction elements, so subsequent
371            reads see the zero data */
372         ecode = transaction_write(tdb, tdb->file->map_size, NULL, addition);
373         if (ecode == TDB_SUCCESS) {
374                 tdb->file->map_size += addition;
375         }
376         return ecode;
377 }
378
379 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
380                                 size_t len, bool write_mode)
381 {
382         size_t blk = off / PAGESIZE, end_blk;
383
384         /* This is wrong for zero-length blocks, but will fail gracefully */
385         end_blk = (off + len - 1) / PAGESIZE;
386
387         /* Can only do direct if in single block and we've already copied. */
388         if (write_mode) {
389                 tdb->stats.transaction_write_direct++;
390                 if (blk != end_blk
391                     || blk >= tdb->transaction->num_blocks
392                     || tdb->transaction->blocks[blk] == NULL) {
393                         tdb->stats.transaction_write_direct_fail++;
394                         return NULL;
395                 }
396                 return tdb->transaction->blocks[blk] + off % PAGESIZE;
397         }
398
399         tdb->stats.transaction_read_direct++;
400         /* Single which we have copied? */
401         if (blk == end_blk
402             && blk < tdb->transaction->num_blocks
403             && tdb->transaction->blocks[blk])
404                 return tdb->transaction->blocks[blk] + off % PAGESIZE;
405
406         /* Otherwise must be all not copied. */
407         while (blk <= end_blk) {
408                 if (blk >= tdb->transaction->num_blocks)
409                         break;
410                 if (tdb->transaction->blocks[blk]) {
411                         tdb->stats.transaction_read_direct_fail++;
412                         return NULL;
413                 }
414                 blk++;
415         }
416         return tdb->transaction->io_methods->direct(tdb, off, len, false);
417 }
418
419 static const struct tdb_methods transaction_methods = {
420         transaction_read,
421         transaction_write,
422         transaction_oob,
423         transaction_expand_file,
424         transaction_direct,
425 };
426
427 /*
428   sync to disk
429 */
430 static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
431                                        tdb_off_t offset, tdb_len_t length)
432 {
433         if (tdb->flags & TDB_NOSYNC) {
434                 return TDB_SUCCESS;
435         }
436
437         if (fsync(tdb->file->fd) != 0) {
438                 return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
439                                   "tdb_transaction: fsync failed: %s",
440                                   strerror(errno));
441         }
442 #ifdef MS_SYNC
443         if (tdb->file->map_ptr) {
444                 tdb_off_t moffset = offset & ~(getpagesize()-1);
445                 if (msync(moffset + (char *)tdb->file->map_ptr,
446                           length + (offset - moffset), MS_SYNC) != 0) {
447                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
448                                           "tdb_transaction: msync failed: %s",
449                                           strerror(errno));
450                 }
451         }
452 #endif
453         return TDB_SUCCESS;
454 }
455
456
457 static void _tdb_transaction_cancel(struct tdb_context *tdb)
458 {
459         int i;
460         enum TDB_ERROR ecode;
461
462         if (tdb->transaction == NULL) {
463                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
464                            "tdb_transaction_cancel: no transaction");
465                 return;
466         }
467
468         if (tdb->transaction->nesting != 0) {
469                 tdb->transaction->transaction_error = 1;
470                 tdb->transaction->nesting--;
471                 return;
472         }
473
474         tdb->file->map_size = tdb->transaction->old_map_size;
475
476         /* free all the transaction blocks */
477         for (i=0;i<tdb->transaction->num_blocks;i++) {
478                 if (tdb->transaction->blocks[i] != NULL) {
479                         free(tdb->transaction->blocks[i]);
480                 }
481         }
482         SAFE_FREE(tdb->transaction->blocks);
483
484         if (tdb->transaction->magic_offset) {
485                 const struct tdb_methods *methods = tdb->transaction->io_methods;
486                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
487
488                 /* remove the recovery marker */
489                 ecode = methods->twrite(tdb, tdb->transaction->magic_offset,
490                                         &invalid, sizeof(invalid));
491                 if (ecode == TDB_SUCCESS)
492                         ecode = transaction_sync(tdb,
493                                                  tdb->transaction->magic_offset,
494                                                  sizeof(invalid));
495                 if (ecode != TDB_SUCCESS) {
496                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
497                                    "tdb_transaction_cancel: failed to remove"
498                                    " recovery magic");
499                 }
500         }
501
502         if (tdb->file->allrecord_lock.count)
503                 tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
504
505         /* restore the normal io methods */
506         tdb->methods = tdb->transaction->io_methods;
507
508         tdb_transaction_unlock(tdb, F_WRLCK);
509
510         if (tdb_has_open_lock(tdb))
511                 tdb_unlock_open(tdb, F_WRLCK);
512
513         SAFE_FREE(tdb->transaction);
514 }
515
516 /*
517   start a tdb transaction. No token is returned, as only a single
518   transaction is allowed to be pending per tdb_context
519 */
520 enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
521 {
522         enum TDB_ERROR ecode;
523
524         tdb->stats.transactions++;
525         /* some sanity checks */
526         if (tdb->flags & TDB_INTERNAL) {
527                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
528                                                     TDB_LOG_USE_ERROR,
529                                                     "tdb_transaction_start:"
530                                                     " cannot start a"
531                                                     " transaction on an"
532                                                     " internal tdb");
533         }
534
535         if (tdb->read_only) {
536                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_RDONLY,
537                                                     TDB_LOG_USE_ERROR,
538                                                     "tdb_transaction_start:"
539                                                     " cannot start a"
540                                                     " transaction on a "
541                                                     " read-only tdb");
542         }
543
544         /* cope with nested tdb_transaction_start() calls */
545         if (tdb->transaction != NULL) {
546                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
547                         return tdb->last_error
548                                 = tdb_logerr(tdb, TDB_ERR_IO,
549                                              TDB_LOG_USE_ERROR,
550                                              "tdb_transaction_start:"
551                                              " already inside transaction");
552                 }
553                 tdb->transaction->nesting++;
554                 tdb->stats.transaction_nest++;
555                 return 0;
556         }
557
558         if (tdb_has_hash_locks(tdb)) {
559                 /* the caller must not have any locks when starting a
560                    transaction as otherwise we'll be screwed by lack
561                    of nested locks in POSIX */
562                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK,
563                                                     TDB_LOG_USE_ERROR,
564                                                     "tdb_transaction_start:"
565                                                     " cannot start a"
566                                                     " transaction with locks"
567                                                     " held");
568         }
569
570         tdb->transaction = (struct tdb_transaction *)
571                 calloc(sizeof(struct tdb_transaction), 1);
572         if (tdb->transaction == NULL) {
573                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM,
574                                                     TDB_LOG_ERROR,
575                                                     "tdb_transaction_start:"
576                                                     " cannot allocate");
577         }
578
579         /* get the transaction write lock. This is a blocking lock. As
580            discussed with Volker, there are a number of ways we could
581            make this async, which we will probably do in the future */
582         ecode = tdb_transaction_lock(tdb, F_WRLCK);
583         if (ecode != TDB_SUCCESS) {
584                 SAFE_FREE(tdb->transaction->blocks);
585                 SAFE_FREE(tdb->transaction);
586                 return tdb->last_error = ecode;
587         }
588
589         /* get a read lock over entire file. This is upgraded to a write
590            lock during the commit */
591         ecode = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true);
592         if (ecode != TDB_SUCCESS) {
593                 goto fail_allrecord_lock;
594         }
595
596         /* make sure we know about any file expansions already done by
597            anyone else */
598         tdb->methods->oob(tdb, tdb->file->map_size + 1, true);
599         tdb->transaction->old_map_size = tdb->file->map_size;
600
601         /* finally hook the io methods, replacing them with
602            transaction specific methods */
603         tdb->transaction->io_methods = tdb->methods;
604         tdb->methods = &transaction_methods;
605         return tdb->last_error = TDB_SUCCESS;
606
607 fail_allrecord_lock:
608         tdb_transaction_unlock(tdb, F_WRLCK);
609         SAFE_FREE(tdb->transaction->blocks);
610         SAFE_FREE(tdb->transaction);
611         return tdb->last_error = ecode;
612 }
613
614
615 /*
616   cancel the current transaction
617 */
618 void tdb_transaction_cancel(struct tdb_context *tdb)
619 {
620         tdb->stats.transaction_cancel++;
621         _tdb_transaction_cancel(tdb);
622 }
623
624 /*
625   work out how much space the linearised recovery data will consume (worst case)
626 */
627 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
628 {
629         tdb_len_t recovery_size = 0;
630         int i;
631
632         recovery_size = 0;
633         for (i=0;i<tdb->transaction->num_blocks;i++) {
634                 if (i * PAGESIZE >= tdb->transaction->old_map_size) {
635                         break;
636                 }
637                 if (tdb->transaction->blocks[i] == NULL) {
638                         continue;
639                 }
640                 recovery_size += 2*sizeof(tdb_off_t);
641                 if (i == tdb->transaction->num_blocks-1) {
642                         recovery_size += tdb->transaction->last_block_size;
643                 } else {
644                         recovery_size += PAGESIZE;
645                 }
646         }
647
648         return recovery_size;
649 }
650
651 static enum TDB_ERROR tdb_recovery_area(struct tdb_context *tdb,
652                                         const struct tdb_methods *methods,
653                                         tdb_off_t *recovery_offset,
654                                         struct tdb_recovery_record *rec)
655 {
656         enum TDB_ERROR ecode;
657
658         *recovery_offset = tdb_read_off(tdb,
659                                         offsetof(struct tdb_header, recovery));
660         if (TDB_OFF_IS_ERR(*recovery_offset)) {
661                 return *recovery_offset;
662         }
663
664         if (*recovery_offset == 0) {
665                 rec->max_len = 0;
666                 return TDB_SUCCESS;
667         }
668
669         ecode = methods->tread(tdb, *recovery_offset, rec, sizeof(*rec));
670         if (ecode != TDB_SUCCESS)
671                 return ecode;
672
673         tdb_convert(tdb, rec, sizeof(*rec));
674         /* ignore invalid recovery regions: can happen in crash */
675         if (rec->magic != TDB_RECOVERY_MAGIC &&
676             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
677                 *recovery_offset = 0;
678                 rec->max_len = 0;
679         }
680         return TDB_SUCCESS;
681 }
682
683 static unsigned int same(const unsigned char *new,
684                          const unsigned char *old,
685                          unsigned int length)
686 {
687         unsigned int i;
688
689         for (i = 0; i < length; i++) {
690                 if (new[i] != old[i])
691                         break;
692         }
693         return i;
694 }
695
696 static unsigned int different(const unsigned char *new,
697                               const unsigned char *old,
698                               unsigned int length,
699                               unsigned int min_same,
700                               unsigned int *samelen)
701 {
702         unsigned int i;
703
704         *samelen = 0;
705         for (i = 0; i < length; i++) {
706                 if (new[i] == old[i]) {
707                         (*samelen)++;
708                 } else {
709                         if (*samelen >= min_same) {
710                                 return i - *samelen;
711                         }
712                         *samelen = 0;
713                 }
714         }
715
716         if (*samelen < min_same)
717                 *samelen = 0;
718         return length - *samelen;
719 }
720
721 /* Allocates recovery blob, without tdb_recovery_record at head set up. */
722 static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
723                                                   tdb_len_t *len)
724 {
725         struct tdb_recovery_record *rec;
726         size_t i;
727         enum TDB_ERROR ecode;
728         unsigned char *p;
729         const struct tdb_methods *old_methods = tdb->methods;
730
731         rec = malloc(sizeof(*rec) + tdb_recovery_size(tdb));
732         if (!rec) {
733                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
734                            "transaction_setup_recovery:"
735                            " cannot allocate");
736                 return TDB_ERR_PTR(TDB_ERR_OOM);
737         }
738
739         /* We temporarily revert to the old I/O methods, so we can use
740          * tdb_access_read */
741         tdb->methods = tdb->transaction->io_methods;
742
743         /* build the recovery data into a single blob to allow us to do a single
744            large write, which should be more efficient */
745         p = (unsigned char *)(rec + 1);
746         for (i=0;i<tdb->transaction->num_blocks;i++) {
747                 tdb_off_t offset;
748                 tdb_len_t length;
749                 unsigned int off;
750                 const unsigned char *buffer;
751
752                 if (tdb->transaction->blocks[i] == NULL) {
753                         continue;
754                 }
755
756                 offset = i * PAGESIZE;
757                 length = PAGESIZE;
758                 if (i == tdb->transaction->num_blocks-1) {
759                         length = tdb->transaction->last_block_size;
760                 }
761
762                 if (offset >= tdb->transaction->old_map_size) {
763                         continue;
764                 }
765
766                 if (offset + length > tdb->file->map_size) {
767                         ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
768                                            "tdb_transaction_setup_recovery:"
769                                            " transaction data over new region"
770                                            " boundary");
771                         goto fail;
772                 }
773                 if (offset + length > tdb->transaction->old_map_size) {
774                         /* Short read at EOF. */
775                         length = tdb->transaction->old_map_size - offset;
776                 }
777                 buffer = tdb_access_read(tdb, offset, length, false);
778                 if (TDB_PTR_IS_ERR(buffer)) {
779                         ecode = TDB_PTR_ERR(buffer);
780                         goto fail;
781                 }
782
783                 /* Skip over anything the same at the start. */
784                 off = same(tdb->transaction->blocks[i], buffer, length);
785                 offset += off;
786
787                 while (off < length) {
788                         tdb_len_t len;
789                         unsigned int samelen;
790
791                         len = different(tdb->transaction->blocks[i] + off,
792                                         buffer + off, length - off,
793                                         sizeof(offset) + sizeof(len) + 1,
794                                         &samelen);
795
796                         memcpy(p, &offset, sizeof(offset));
797                         memcpy(p + sizeof(offset), &len, sizeof(len));
798                         tdb_convert(tdb, p, sizeof(offset) + sizeof(len));
799                         p += sizeof(offset) + sizeof(len);
800                         memcpy(p, buffer + off, len);
801                         p += len;
802                         off += len + samelen;
803                         offset += len + samelen;
804                 }
805                 tdb_access_release(tdb, buffer);
806         }
807
808         *len = p - (unsigned char *)(rec + 1);
809         tdb->methods = old_methods;
810         return rec;
811
812 fail:
813         free(rec);
814         tdb->methods = old_methods;
815         return TDB_ERR_PTR(ecode);
816 }
817
818 static tdb_off_t create_recovery_area(struct tdb_context *tdb,
819                                       tdb_len_t rec_length,
820                                       struct tdb_recovery_record *rec)
821 {
822         tdb_off_t off, recovery_off;
823         tdb_len_t addition;
824         enum TDB_ERROR ecode;
825         const struct tdb_methods *methods = tdb->transaction->io_methods;
826
827         /* round up to a multiple of page size. Overallocate, since each
828          * such allocation forces us to expand the file. */
829         rec->max_len
830                 = (((sizeof(*rec) + rec_length + rec_length / 2)
831                     + PAGESIZE-1) & ~(PAGESIZE-1))
832                 - sizeof(*rec);
833         off = tdb->file->map_size;
834
835         /* Restore ->map_size before calling underlying expand_file.
836            Also so that we don't try to expand the file again in the
837            transaction commit, which would destroy the recovery
838            area */
839         addition = (tdb->file->map_size - tdb->transaction->old_map_size) +
840                 sizeof(*rec) + rec->max_len;
841         tdb->file->map_size = tdb->transaction->old_map_size;
842         tdb->stats.transaction_expand_file++;
843         ecode = methods->expand_file(tdb, addition);
844         if (ecode != TDB_SUCCESS) {
845                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
846                                   "tdb_recovery_allocate:"
847                                   " failed to create recovery area");
848         }
849
850         /* we have to reset the old map size so that we don't try to
851            expand the file again in the transaction commit, which
852            would destroy the recovery area */
853         tdb->transaction->old_map_size = tdb->file->map_size;
854
855         /* write the recovery header offset and sync - we can sync without a race here
856            as the magic ptr in the recovery record has not been set */
857         recovery_off = off;
858         tdb_convert(tdb, &recovery_off, sizeof(recovery_off));
859         ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery),
860                                 &recovery_off, sizeof(tdb_off_t));
861         if (ecode != TDB_SUCCESS) {
862                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
863                                   "tdb_recovery_allocate:"
864                                   " failed to write recovery head");
865         }
866         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
867                                    &recovery_off,
868                                    sizeof(tdb_off_t));
869         return off;
870 }
871
872 /*
873   setup the recovery data that will be used on a crash during commit
874 */
875 static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb)
876 {
877         tdb_len_t recovery_size = 0;
878         tdb_off_t recovery_off = 0;
879         tdb_off_t old_map_size = tdb->transaction->old_map_size;
880         struct tdb_recovery_record *recovery;
881         const struct tdb_methods *methods = tdb->transaction->io_methods;
882         uint64_t magic;
883         enum TDB_ERROR ecode;
884
885         recovery = alloc_recovery(tdb, &recovery_size);
886         if (TDB_PTR_IS_ERR(recovery))
887                 return TDB_PTR_ERR(recovery);
888
889         ecode = tdb_recovery_area(tdb, methods, &recovery_off, recovery);
890         if (ecode) {
891                 free(recovery);
892                 return ecode;
893         }
894
895         if (recovery->max_len < recovery_size) {
896                 /* Not large enough. Free up old recovery area. */
897                 if (recovery_off) {
898                         tdb->stats.frees++;
899                         ecode = add_free_record(tdb, recovery_off,
900                                                 sizeof(*recovery)
901                                                 + recovery->max_len,
902                                                 TDB_LOCK_WAIT, true);
903                         free(recovery);
904                         if (ecode != TDB_SUCCESS) {
905                                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
906                                                   "tdb_recovery_allocate:"
907                                                   " failed to free previous"
908                                                   " recovery area");
909                         }
910
911                         /* Refresh recovery after add_free_record above. */
912                         recovery = alloc_recovery(tdb, &recovery_size);
913                         if (TDB_PTR_IS_ERR(recovery))
914                                 return TDB_PTR_ERR(recovery);
915                 }
916
917                 recovery_off = create_recovery_area(tdb, recovery_size,
918                                                     recovery);
919                 if (TDB_OFF_IS_ERR(recovery_off)) {
920                         free(recovery);
921                         return recovery_off;
922                 }
923         }
924
925         /* Now we know size, convert rec header. */
926         recovery->magic = TDB_RECOVERY_INVALID_MAGIC;
927         recovery->len = recovery_size;
928         recovery->eof = old_map_size;
929         tdb_convert(tdb, recovery, sizeof(*recovery));
930
931         /* write the recovery data to the recovery area */
932         ecode = methods->twrite(tdb, recovery_off, recovery, recovery_size);
933         if (ecode != TDB_SUCCESS) {
934                 free(recovery);
935                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
936                                   "tdb_transaction_setup_recovery:"
937                                   " failed to write recovery data");
938         }
939         transaction_write_existing(tdb, recovery_off, recovery, recovery_size);
940
941         free(recovery);
942
943         /* as we don't have ordered writes, we have to sync the recovery
944            data before we update the magic to indicate that the recovery
945            data is present */
946         ecode = transaction_sync(tdb, recovery_off, recovery_size);
947         if (ecode != TDB_SUCCESS)
948                 return ecode;
949
950         magic = TDB_RECOVERY_MAGIC;
951         tdb_convert(tdb, &magic, sizeof(magic));
952
953         tdb->transaction->magic_offset
954                 = recovery_off + offsetof(struct tdb_recovery_record, magic);
955
956         ecode = methods->twrite(tdb, tdb->transaction->magic_offset,
957                                 &magic, sizeof(magic));
958         if (ecode != TDB_SUCCESS) {
959                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
960                                   "tdb_transaction_setup_recovery:"
961                                   " failed to write recovery magic");
962         }
963         transaction_write_existing(tdb, tdb->transaction->magic_offset,
964                                    &magic, sizeof(magic));
965
966         /* ensure the recovery magic marker is on disk */
967         return transaction_sync(tdb, tdb->transaction->magic_offset,
968                                 sizeof(magic));
969 }
970
971 static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
972 {
973         const struct tdb_methods *methods;
974         enum TDB_ERROR ecode;
975
976         if (tdb->transaction == NULL) {
977                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
978                                   "tdb_transaction_prepare_commit:"
979                                   " no transaction");
980         }
981
982         if (tdb->transaction->prepared) {
983                 _tdb_transaction_cancel(tdb);
984                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
985                                   "tdb_transaction_prepare_commit:"
986                                   " transaction already prepared");
987         }
988
989         if (tdb->transaction->transaction_error) {
990                 _tdb_transaction_cancel(tdb);
991                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
992                                   "tdb_transaction_prepare_commit:"
993                                   " transaction error pending");
994         }
995
996
997         if (tdb->transaction->nesting != 0) {
998                 return TDB_SUCCESS;
999         }
1000
1001         /* check for a null transaction */
1002         if (tdb->transaction->blocks == NULL) {
1003                 return TDB_SUCCESS;
1004         }
1005
1006         methods = tdb->transaction->io_methods;
1007
1008         /* upgrade the main transaction lock region to a write lock */
1009         ecode = tdb_allrecord_upgrade(tdb);
1010         if (ecode != TDB_SUCCESS) {
1011                 return ecode;
1012         }
1013
1014         /* get the open lock - this prevents new users attaching to the database
1015            during the commit */
1016         ecode = tdb_lock_open(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
1017         if (ecode != TDB_SUCCESS) {
1018                 return ecode;
1019         }
1020
1021         /* Since we have whole db locked, we don't need the expansion lock. */
1022         if (!(tdb->flags & TDB_NOSYNC)) {
1023                 /* Sets up tdb->transaction->recovery and
1024                  * tdb->transaction->magic_offset. */
1025                 ecode = transaction_setup_recovery(tdb);
1026                 if (ecode != TDB_SUCCESS) {
1027                         return ecode;
1028                 }
1029         }
1030
1031         tdb->transaction->prepared = true;
1032
1033         /* expand the file to the new size if needed */
1034         if (tdb->file->map_size != tdb->transaction->old_map_size) {
1035                 tdb_len_t add;
1036
1037                 add = tdb->file->map_size - tdb->transaction->old_map_size;
1038                 /* Restore original map size for tdb_expand_file */
1039                 tdb->file->map_size = tdb->transaction->old_map_size;
1040                 ecode = methods->expand_file(tdb, add);
1041                 if (ecode != TDB_SUCCESS) {
1042                         return ecode;
1043                 }
1044         }
1045
1046         /* Keep the open lock until the actual commit */
1047         return TDB_SUCCESS;
1048 }
1049
1050 /*
1051    prepare to commit the current transaction
1052 */
1053 enum TDB_ERROR tdb_transaction_prepare_commit(struct tdb_context *tdb)
1054 {
1055         return _tdb_transaction_prepare_commit(tdb);
1056 }
1057
1058 /*
1059   commit the current transaction
1060 */
1061 enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb)
1062 {
1063         const struct tdb_methods *methods;
1064         int i;
1065         enum TDB_ERROR ecode;
1066
1067         if (tdb->transaction == NULL) {
1068                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
1069                                                     TDB_LOG_USE_ERROR,
1070                                                     "tdb_transaction_commit:"
1071                                                     " no transaction");
1072         }
1073
1074         tdb_trace(tdb, "tdb_transaction_commit");
1075
1076         if (tdb->transaction->nesting != 0) {
1077                 tdb->transaction->nesting--;
1078                 return tdb->last_error = TDB_SUCCESS;
1079         }
1080
1081         /* check for a null transaction */
1082         if (tdb->transaction->blocks == NULL) {
1083                 _tdb_transaction_cancel(tdb);
1084                 return tdb->last_error = TDB_SUCCESS;
1085         }
1086
1087         if (!tdb->transaction->prepared) {
1088                 ecode = _tdb_transaction_prepare_commit(tdb);
1089                 if (ecode != TDB_SUCCESS) {
1090                         _tdb_transaction_cancel(tdb);
1091                         return tdb->last_error = ecode;
1092                 }
1093         }
1094
1095         methods = tdb->transaction->io_methods;
1096
1097         /* perform all the writes */
1098         for (i=0;i<tdb->transaction->num_blocks;i++) {
1099                 tdb_off_t offset;
1100                 tdb_len_t length;
1101
1102                 if (tdb->transaction->blocks[i] == NULL) {
1103                         continue;
1104                 }
1105
1106                 offset = i * PAGESIZE;
1107                 length = PAGESIZE;
1108                 if (i == tdb->transaction->num_blocks-1) {
1109                         length = tdb->transaction->last_block_size;
1110                 }
1111
1112                 ecode = methods->twrite(tdb, offset,
1113                                         tdb->transaction->blocks[i], length);
1114                 if (ecode != TDB_SUCCESS) {
1115                         /* we've overwritten part of the data and
1116                            possibly expanded the file, so we need to
1117                            run the crash recovery code */
1118                         tdb->methods = methods;
1119                         tdb_transaction_recover(tdb);
1120
1121                         _tdb_transaction_cancel(tdb);
1122
1123                         return tdb->last_error = ecode;
1124                 }
1125                 SAFE_FREE(tdb->transaction->blocks[i]);
1126         }
1127
1128         SAFE_FREE(tdb->transaction->blocks);
1129         tdb->transaction->num_blocks = 0;
1130
1131         /* ensure the new data is on disk */
1132         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1133         if (ecode != TDB_SUCCESS) {
1134                 return tdb->last_error = ecode;
1135         }
1136
1137         /*
1138           TODO: maybe write to some dummy hdr field, or write to magic
1139           offset without mmap, before the last sync, instead of the
1140           utime() call
1141         */
1142
1143         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1144            don't change the mtime of the file, this means the file may
1145            not be backed up (as tdb rounding to block sizes means that
1146            file size changes are quite rare too). The following forces
1147            mtime changes when a transaction completes */
1148 #if HAVE_UTIME
1149         utime(tdb->name, NULL);
1150 #endif
1151
1152         /* use a transaction cancel to free memory and remove the
1153            transaction locks: it "restores" map_size, too. */
1154         tdb->transaction->old_map_size = tdb->file->map_size;
1155         _tdb_transaction_cancel(tdb);
1156
1157         return tdb->last_error = TDB_SUCCESS;
1158 }
1159
1160
1161 /*
1162   recover from an aborted transaction. Must be called with exclusive
1163   database write access already established (including the open
1164   lock to prevent new processes attaching)
1165 */
1166 enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb)
1167 {
1168         tdb_off_t recovery_head, recovery_eof;
1169         unsigned char *data, *p;
1170         struct tdb_recovery_record rec;
1171         enum TDB_ERROR ecode;
1172
1173         /* find the recovery area */
1174         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1175         if (TDB_OFF_IS_ERR(recovery_head)) {
1176                 return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
1177                                   "tdb_transaction_recover:"
1178                                   " failed to read recovery head");
1179         }
1180
1181         if (recovery_head == 0) {
1182                 /* we have never allocated a recovery record */
1183                 return TDB_SUCCESS;
1184         }
1185
1186         /* read the recovery record */
1187         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1188         if (ecode != TDB_SUCCESS) {
1189                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1190                                   "tdb_transaction_recover:"
1191                                   " failed to read recovery record");
1192         }
1193
1194         if (rec.magic != TDB_RECOVERY_MAGIC) {
1195                 /* there is no valid recovery data */
1196                 return TDB_SUCCESS;
1197         }
1198
1199         if (tdb->read_only) {
1200                 return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1201                                   "tdb_transaction_recover:"
1202                                   " attempt to recover read only database");
1203         }
1204
1205         recovery_eof = rec.eof;
1206
1207         data = (unsigned char *)malloc(rec.len);
1208         if (data == NULL) {
1209                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1210                                   "tdb_transaction_recover:"
1211                                   " failed to allocate recovery data");
1212         }
1213
1214         /* read the full recovery data */
1215         ecode = tdb->methods->tread(tdb, recovery_head + sizeof(rec), data,
1216                                     rec.len);
1217         if (ecode != TDB_SUCCESS) {
1218                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1219                                   "tdb_transaction_recover:"
1220                                   " failed to read recovery data");
1221         }
1222
1223         /* recover the file data */
1224         p = data;
1225         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1226                 tdb_off_t ofs;
1227                 tdb_len_t len;
1228                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1229                 memcpy(&ofs, p, sizeof(ofs));
1230                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1231                 p += sizeof(ofs) + sizeof(len);
1232
1233                 ecode = tdb->methods->twrite(tdb, ofs, p, len);
1234                 if (ecode != TDB_SUCCESS) {
1235                         free(data);
1236                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1237                                           "tdb_transaction_recover:"
1238                                           " failed to recover %zu bytes"
1239                                           " at offset %zu",
1240                                           (size_t)len, (size_t)ofs);
1241                 }
1242                 p += len;
1243         }
1244
1245         free(data);
1246
1247         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1248         if (ecode != TDB_SUCCESS) {
1249                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1250                                   "tdb_transaction_recover:"
1251                                   " failed to sync recovery");
1252         }
1253
1254         /* if the recovery area is after the recovered eof then remove it */
1255         if (recovery_eof <= recovery_head) {
1256                 ecode = tdb_write_off(tdb, offsetof(struct tdb_header,
1257                                                     recovery),
1258                                       0);
1259                 if (ecode != TDB_SUCCESS) {
1260                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1261                                           "tdb_transaction_recover:"
1262                                           " failed to remove recovery head");
1263                 }
1264         }
1265
1266         /* remove the recovery magic */
1267         ecode = tdb_write_off(tdb,
1268                               recovery_head
1269                               + offsetof(struct tdb_recovery_record, magic),
1270                               TDB_RECOVERY_INVALID_MAGIC);
1271         if (ecode != TDB_SUCCESS) {
1272                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1273                                   "tdb_transaction_recover:"
1274                                   " failed to remove recovery magic");
1275         }
1276
1277         ecode = transaction_sync(tdb, 0, recovery_eof);
1278         if (ecode != TDB_SUCCESS) {
1279                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1280                                   "tdb_transaction_recover:"
1281                                   " failed to sync2 recovery");
1282         }
1283
1284         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1285                    "tdb_transaction_recover: recovered %zu byte database",
1286                    (size_t)recovery_eof);
1287
1288         /* all done */
1289         return TDB_SUCCESS;
1290 }
1291
1292 tdb_bool_err tdb_needs_recovery(struct tdb_context *tdb)
1293 {
1294         tdb_off_t recovery_head;
1295         struct tdb_recovery_record rec;
1296         enum TDB_ERROR ecode;
1297
1298         /* find the recovery area */
1299         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1300         if (TDB_OFF_IS_ERR(recovery_head)) {
1301                 return recovery_head;
1302         }
1303
1304         if (recovery_head == 0) {
1305                 /* we have never allocated a recovery record */
1306                 return false;
1307         }
1308
1309         /* read the recovery record */
1310         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1311         if (ecode != TDB_SUCCESS) {
1312                 return ecode;
1313         }
1314
1315         return (rec.magic == TDB_RECOVERY_MAGIC);
1316 }