]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
ccanlint: fix dependencies on tests_pass_without_features.
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in POSIX locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is canceled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or canceled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* the original io methods - used to do IOs to the real db */
96         const struct tdb_methods *io_methods;
97
98         /* the list of transaction blocks. When a block is first
99            written to, it gets created in this list */
100         uint8_t **blocks;
101         size_t num_blocks;
102         size_t last_block_size; /* number of valid bytes in the last block */
103
104         /* non-zero when an internal transaction error has
105            occurred. All write operations will then fail until the
106            transaction is ended */
107         int transaction_error;
108
109         /* when inside a transaction we need to keep track of any
110            nested tdb_transaction_start() calls, as these are allowed,
111            but don't create a new transaction */
112         unsigned int nesting;
113
114         /* set when a prepare has already occurred */
115         bool prepared;
116         tdb_off_t magic_offset;
117
118         /* old file size before transaction */
119         tdb_len_t old_map_size;
120 };
121
122 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
123 #define PAGESIZE 65536
124
125 /*
126   read while in a transaction. We need to check first if the data is in our list
127   of transaction elements, then if not do a real read
128 */
129 static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off,
130                                        void *buf, tdb_len_t len)
131 {
132         size_t blk;
133         enum TDB_ERROR ecode;
134
135         /* break it down into block sized ops */
136         while (len + (off % PAGESIZE) > PAGESIZE) {
137                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
138                 ecode = transaction_read(tdb, off, buf, len2);
139                 if (ecode != TDB_SUCCESS) {
140                         return ecode;
141                 }
142                 len -= len2;
143                 off += len2;
144                 buf = (void *)(len2 + (char *)buf);
145         }
146
147         if (len == 0) {
148                 return TDB_SUCCESS;
149         }
150
151         blk = off / PAGESIZE;
152
153         /* see if we have it in the block list */
154         if (tdb->transaction->num_blocks <= blk ||
155             tdb->transaction->blocks[blk] == NULL) {
156                 /* nope, do a real read */
157                 ecode = tdb->transaction->io_methods->tread(tdb, off, buf, len);
158                 if (ecode != TDB_SUCCESS) {
159                         goto fail;
160                 }
161                 return 0;
162         }
163
164         /* it is in the block list. Now check for the last block */
165         if (blk == tdb->transaction->num_blocks-1) {
166                 if (len > tdb->transaction->last_block_size) {
167                         ecode = TDB_ERR_IO;
168                         goto fail;
169                 }
170         }
171
172         /* now copy it out of this block */
173         memcpy(buf, tdb->transaction->blocks[blk] + (off % PAGESIZE), len);
174         return TDB_SUCCESS;
175
176 fail:
177         tdb->transaction->transaction_error = 1;
178         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
179                           "transaction_read: failed at off=%zu len=%zu",
180                           (size_t)off, (size_t)len);
181 }
182
183
184 /*
185   write while in a transaction
186 */
187 static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off,
188                                         const void *buf, tdb_len_t len)
189 {
190         size_t blk;
191         enum TDB_ERROR ecode;
192
193         /* Only a commit is allowed on a prepared transaction */
194         if (tdb->transaction->prepared) {
195                 ecode = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
196                                    "transaction_write: transaction already"
197                                    " prepared, write not allowed");
198                 goto fail;
199         }
200
201         /* break it up into block sized chunks */
202         while (len + (off % PAGESIZE) > PAGESIZE) {
203                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
204                 ecode = transaction_write(tdb, off, buf, len2);
205                 if (ecode != TDB_SUCCESS) {
206                         return -1;
207                 }
208                 len -= len2;
209                 off += len2;
210                 if (buf != NULL) {
211                         buf = (const void *)(len2 + (const char *)buf);
212                 }
213         }
214
215         if (len == 0) {
216                 return TDB_SUCCESS;
217         }
218
219         blk = off / PAGESIZE;
220         off = off % PAGESIZE;
221
222         if (tdb->transaction->num_blocks <= blk) {
223                 uint8_t **new_blocks;
224                 /* expand the blocks array */
225                 if (tdb->transaction->blocks == NULL) {
226                         new_blocks = (uint8_t **)malloc(
227                                 (blk+1)*sizeof(uint8_t *));
228                 } else {
229                         new_blocks = (uint8_t **)realloc(
230                                 tdb->transaction->blocks,
231                                 (blk+1)*sizeof(uint8_t *));
232                 }
233                 if (new_blocks == NULL) {
234                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
235                                            "transaction_write:"
236                                            " failed to allocate");
237                         goto fail;
238                 }
239                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
240                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
241                 tdb->transaction->blocks = new_blocks;
242                 tdb->transaction->num_blocks = blk+1;
243                 tdb->transaction->last_block_size = 0;
244         }
245
246         /* allocate and fill a block? */
247         if (tdb->transaction->blocks[blk] == NULL) {
248                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1);
249                 if (tdb->transaction->blocks[blk] == NULL) {
250                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
251                                            "transaction_write:"
252                                            " failed to allocate");
253                         goto fail;
254                 }
255                 if (tdb->transaction->old_map_size > blk * PAGESIZE) {
256                         tdb_len_t len2 = PAGESIZE;
257                         if (len2 + (blk * PAGESIZE) > tdb->transaction->old_map_size) {
258                                 len2 = tdb->transaction->old_map_size - (blk * PAGESIZE);
259                         }
260                         ecode = tdb->transaction->io_methods->tread(tdb,
261                                         blk * PAGESIZE,
262                                         tdb->transaction->blocks[blk],
263                                         len2);
264                         if (ecode != TDB_SUCCESS) {
265                                 ecode = tdb_logerr(tdb, ecode,
266                                                    TDB_LOG_ERROR,
267                                                    "transaction_write:"
268                                                    " failed to"
269                                                    " read old block: %s",
270                                                    strerror(errno));
271                                 SAFE_FREE(tdb->transaction->blocks[blk]);
272                                 goto fail;
273                         }
274                         if (blk == tdb->transaction->num_blocks-1) {
275                                 tdb->transaction->last_block_size = len2;
276                         }
277                 }
278         }
279
280         /* overwrite part of an existing block */
281         if (buf == NULL) {
282                 memset(tdb->transaction->blocks[blk] + off, 0, len);
283         } else {
284                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
285         }
286         if (blk == tdb->transaction->num_blocks-1) {
287                 if (len + off > tdb->transaction->last_block_size) {
288                         tdb->transaction->last_block_size = len + off;
289                 }
290         }
291
292         return TDB_SUCCESS;
293
294 fail:
295         tdb->transaction->transaction_error = 1;
296         return ecode;
297 }
298
299
300 /*
301   write while in a transaction - this variant never expands the transaction blocks, it only
302   updates existing blocks. This means it cannot change the recovery size
303 */
304 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
305                                        const void *buf, tdb_len_t len)
306 {
307         size_t blk;
308
309         /* break it up into block sized chunks */
310         while (len + (off % PAGESIZE) > PAGESIZE) {
311                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
312                 transaction_write_existing(tdb, off, buf, len2);
313                 len -= len2;
314                 off += len2;
315                 if (buf != NULL) {
316                         buf = (const void *)(len2 + (const char *)buf);
317                 }
318         }
319
320         if (len == 0) {
321                 return;
322         }
323
324         blk = off / PAGESIZE;
325         off = off % PAGESIZE;
326
327         if (tdb->transaction->num_blocks <= blk ||
328             tdb->transaction->blocks[blk] == NULL) {
329                 return;
330         }
331
332         if (blk == tdb->transaction->num_blocks-1 &&
333             off + len > tdb->transaction->last_block_size) {
334                 if (off >= tdb->transaction->last_block_size) {
335                         return;
336                 }
337                 len = tdb->transaction->last_block_size - off;
338         }
339
340         /* overwrite part of an existing block */
341         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
342 }
343
344
345 /*
346   out of bounds check during a transaction
347 */
348 static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len,
349                                       bool probe)
350 {
351         if (len <= tdb->file->map_size) {
352                 return TDB_SUCCESS;
353         }
354         if (!probe) {
355                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
356                            "tdb_oob len %lld beyond transaction size %lld",
357                            (long long)len,
358                            (long long)tdb->file->map_size);
359         }
360         return TDB_ERR_IO;
361 }
362
363 /*
364   transaction version of tdb_expand().
365 */
366 static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb,
367                                               tdb_off_t addition)
368 {
369         enum TDB_ERROR ecode;
370
371         /* add a write to the transaction elements, so subsequent
372            reads see the zero data */
373         ecode = transaction_write(tdb, tdb->file->map_size, NULL, addition);
374         if (ecode == TDB_SUCCESS) {
375                 tdb->file->map_size += addition;
376         }
377         return ecode;
378 }
379
380 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
381                                 size_t len, bool write_mode)
382 {
383         size_t blk = off / PAGESIZE, end_blk;
384
385         /* This is wrong for zero-length blocks, but will fail gracefully */
386         end_blk = (off + len - 1) / PAGESIZE;
387
388         /* Can only do direct if in single block and we've already copied. */
389         if (write_mode) {
390                 tdb->stats.transaction_write_direct++;
391                 if (blk != end_blk
392                     || blk >= tdb->transaction->num_blocks
393                     || tdb->transaction->blocks[blk] == NULL) {
394                         tdb->stats.transaction_write_direct_fail++;
395                         return NULL;
396                 }
397                 return tdb->transaction->blocks[blk] + off % PAGESIZE;
398         }
399
400         tdb->stats.transaction_read_direct++;
401         /* Single which we have copied? */
402         if (blk == end_blk
403             && blk < tdb->transaction->num_blocks
404             && tdb->transaction->blocks[blk])
405                 return tdb->transaction->blocks[blk] + off % PAGESIZE;
406
407         /* Otherwise must be all not copied. */
408         while (blk <= end_blk) {
409                 if (blk >= tdb->transaction->num_blocks)
410                         break;
411                 if (tdb->transaction->blocks[blk]) {
412                         tdb->stats.transaction_read_direct_fail++;
413                         return NULL;
414                 }
415                 blk++;
416         }
417         return tdb->transaction->io_methods->direct(tdb, off, len, false);
418 }
419
420 static const struct tdb_methods transaction_methods = {
421         transaction_read,
422         transaction_write,
423         transaction_oob,
424         transaction_expand_file,
425         transaction_direct,
426 };
427
428 /*
429   sync to disk
430 */
431 static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
432                                        tdb_off_t offset, tdb_len_t length)
433 {
434         if (tdb->flags & TDB_NOSYNC) {
435                 return TDB_SUCCESS;
436         }
437
438         if (fsync(tdb->file->fd) != 0) {
439                 return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
440                                   "tdb_transaction: fsync failed: %s",
441                                   strerror(errno));
442         }
443 #ifdef MS_SYNC
444         if (tdb->file->map_ptr) {
445                 tdb_off_t moffset = offset & ~(getpagesize()-1);
446                 if (msync(moffset + (char *)tdb->file->map_ptr,
447                           length + (offset - moffset), MS_SYNC) != 0) {
448                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
449                                           "tdb_transaction: msync failed: %s",
450                                           strerror(errno));
451                 }
452         }
453 #endif
454         return TDB_SUCCESS;
455 }
456
457
458 static void _tdb_transaction_cancel(struct tdb_context *tdb)
459 {
460         int i;
461         enum TDB_ERROR ecode;
462
463         if (tdb->transaction == NULL) {
464                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
465                            "tdb_transaction_cancel: no transaction");
466                 return;
467         }
468
469         if (tdb->transaction->nesting != 0) {
470                 tdb->transaction->transaction_error = 1;
471                 tdb->transaction->nesting--;
472                 return;
473         }
474
475         tdb->file->map_size = tdb->transaction->old_map_size;
476
477         /* free all the transaction blocks */
478         for (i=0;i<tdb->transaction->num_blocks;i++) {
479                 if (tdb->transaction->blocks[i] != NULL) {
480                         free(tdb->transaction->blocks[i]);
481                 }
482         }
483         SAFE_FREE(tdb->transaction->blocks);
484
485         if (tdb->transaction->magic_offset) {
486                 const struct tdb_methods *methods = tdb->transaction->io_methods;
487                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
488
489                 /* remove the recovery marker */
490                 ecode = methods->twrite(tdb, tdb->transaction->magic_offset,
491                                         &invalid, sizeof(invalid));
492                 if (ecode == TDB_SUCCESS)
493                         ecode = transaction_sync(tdb,
494                                                  tdb->transaction->magic_offset,
495                                                  sizeof(invalid));
496                 if (ecode != TDB_SUCCESS) {
497                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
498                                    "tdb_transaction_cancel: failed to remove"
499                                    " recovery magic");
500                 }
501         }
502
503         if (tdb->file->allrecord_lock.count)
504                 tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
505
506         /* restore the normal io methods */
507         tdb->methods = tdb->transaction->io_methods;
508
509         tdb_transaction_unlock(tdb, F_WRLCK);
510
511         if (tdb_has_open_lock(tdb))
512                 tdb_unlock_open(tdb, F_WRLCK);
513
514         SAFE_FREE(tdb->transaction);
515 }
516
517 /*
518   start a tdb transaction. No token is returned, as only a single
519   transaction is allowed to be pending per tdb_context
520 */
521 enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
522 {
523         enum TDB_ERROR ecode;
524
525         tdb->stats.transactions++;
526         /* some sanity checks */
527         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
528                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
529                                                     TDB_LOG_USE_ERROR,
530                                                     "tdb_transaction_start:"
531                                                     " cannot start a"
532                                                     " transaction on a "
533                                                     "read-only or internal db");
534         }
535
536         /* cope with nested tdb_transaction_start() calls */
537         if (tdb->transaction != NULL) {
538                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
539                         return tdb->last_error
540                                 = tdb_logerr(tdb, TDB_ERR_IO,
541                                              TDB_LOG_USE_ERROR,
542                                              "tdb_transaction_start:"
543                                              " already inside transaction");
544                 }
545                 tdb->transaction->nesting++;
546                 tdb->stats.transaction_nest++;
547                 return 0;
548         }
549
550         if (tdb_has_hash_locks(tdb)) {
551                 /* the caller must not have any locks when starting a
552                    transaction as otherwise we'll be screwed by lack
553                    of nested locks in POSIX */
554                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK,
555                                                     TDB_LOG_USE_ERROR,
556                                                     "tdb_transaction_start:"
557                                                     " cannot start a"
558                                                     " transaction with locks"
559                                                     " held");
560         }
561
562         tdb->transaction = (struct tdb_transaction *)
563                 calloc(sizeof(struct tdb_transaction), 1);
564         if (tdb->transaction == NULL) {
565                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM,
566                                                     TDB_LOG_ERROR,
567                                                     "tdb_transaction_start:"
568                                                     " cannot allocate");
569         }
570
571         /* get the transaction write lock. This is a blocking lock. As
572            discussed with Volker, there are a number of ways we could
573            make this async, which we will probably do in the future */
574         ecode = tdb_transaction_lock(tdb, F_WRLCK);
575         if (ecode != TDB_SUCCESS) {
576                 SAFE_FREE(tdb->transaction->blocks);
577                 SAFE_FREE(tdb->transaction);
578                 return tdb->last_error = ecode;
579         }
580
581         /* get a read lock over entire file. This is upgraded to a write
582            lock during the commit */
583         ecode = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true);
584         if (ecode != TDB_SUCCESS) {
585                 goto fail_allrecord_lock;
586         }
587
588         /* make sure we know about any file expansions already done by
589            anyone else */
590         tdb->methods->oob(tdb, tdb->file->map_size + 1, true);
591         tdb->transaction->old_map_size = tdb->file->map_size;
592
593         /* finally hook the io methods, replacing them with
594            transaction specific methods */
595         tdb->transaction->io_methods = tdb->methods;
596         tdb->methods = &transaction_methods;
597         return tdb->last_error = TDB_SUCCESS;
598
599 fail_allrecord_lock:
600         tdb_transaction_unlock(tdb, F_WRLCK);
601         SAFE_FREE(tdb->transaction->blocks);
602         SAFE_FREE(tdb->transaction);
603         return tdb->last_error = ecode;
604 }
605
606
607 /*
608   cancel the current transaction
609 */
610 void tdb_transaction_cancel(struct tdb_context *tdb)
611 {
612         tdb->stats.transaction_cancel++;
613         _tdb_transaction_cancel(tdb);
614 }
615
616 /*
617   work out how much space the linearised recovery data will consume (worst case)
618 */
619 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
620 {
621         tdb_len_t recovery_size = 0;
622         int i;
623
624         recovery_size = 0;
625         for (i=0;i<tdb->transaction->num_blocks;i++) {
626                 if (i * PAGESIZE >= tdb->transaction->old_map_size) {
627                         break;
628                 }
629                 if (tdb->transaction->blocks[i] == NULL) {
630                         continue;
631                 }
632                 recovery_size += 2*sizeof(tdb_off_t);
633                 if (i == tdb->transaction->num_blocks-1) {
634                         recovery_size += tdb->transaction->last_block_size;
635                 } else {
636                         recovery_size += PAGESIZE;
637                 }
638         }
639
640         return recovery_size;
641 }
642
643 static enum TDB_ERROR tdb_recovery_area(struct tdb_context *tdb,
644                                         const struct tdb_methods *methods,
645                                         tdb_off_t *recovery_offset,
646                                         struct tdb_recovery_record *rec)
647 {
648         enum TDB_ERROR ecode;
649
650         *recovery_offset = tdb_read_off(tdb,
651                                         offsetof(struct tdb_header, recovery));
652         if (TDB_OFF_IS_ERR(*recovery_offset)) {
653                 return *recovery_offset;
654         }
655
656         if (*recovery_offset == 0) {
657                 rec->max_len = 0;
658                 return TDB_SUCCESS;
659         }
660
661         ecode = methods->tread(tdb, *recovery_offset, rec, sizeof(*rec));
662         if (ecode != TDB_SUCCESS)
663                 return ecode;
664
665         tdb_convert(tdb, rec, sizeof(*rec));
666         /* ignore invalid recovery regions: can happen in crash */
667         if (rec->magic != TDB_RECOVERY_MAGIC &&
668             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
669                 *recovery_offset = 0;
670                 rec->max_len = 0;
671         }
672         return TDB_SUCCESS;
673 }
674
675 static unsigned int same(const unsigned char *new,
676                          const unsigned char *old,
677                          unsigned int length)
678 {
679         unsigned int i;
680
681         for (i = 0; i < length; i++) {
682                 if (new[i] != old[i])
683                         break;
684         }
685         return i;
686 }
687
688 static unsigned int different(const unsigned char *new,
689                               const unsigned char *old,
690                               unsigned int length,
691                               unsigned int min_same,
692                               unsigned int *samelen)
693 {
694         unsigned int i;
695
696         *samelen = 0;
697         for (i = 0; i < length; i++) {
698                 if (new[i] == old[i]) {
699                         (*samelen)++;
700                 } else {
701                         if (*samelen >= min_same) {
702                                 return i - *samelen;
703                         }
704                         *samelen = 0;
705                 }
706         }
707
708         if (*samelen < min_same)
709                 *samelen = 0;
710         return length - *samelen;
711 }
712
713 /* Allocates recovery blob, without tdb_recovery_record at head set up. */
714 static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
715                                                   tdb_len_t *len)
716 {
717         struct tdb_recovery_record *rec;
718         size_t i;
719         enum TDB_ERROR ecode;
720         unsigned char *p;
721         const struct tdb_methods *old_methods = tdb->methods;
722
723         rec = malloc(sizeof(*rec) + tdb_recovery_size(tdb));
724         if (!rec) {
725                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
726                            "transaction_setup_recovery:"
727                            " cannot allocate");
728                 return TDB_ERR_PTR(TDB_ERR_OOM);
729         }
730
731         /* We temporarily revert to the old I/O methods, so we can use
732          * tdb_access_read */
733         tdb->methods = tdb->transaction->io_methods;
734
735         /* build the recovery data into a single blob to allow us to do a single
736            large write, which should be more efficient */
737         p = (unsigned char *)(rec + 1);
738         for (i=0;i<tdb->transaction->num_blocks;i++) {
739                 tdb_off_t offset;
740                 tdb_len_t length;
741                 unsigned int off;
742                 const unsigned char *buffer;
743
744                 if (tdb->transaction->blocks[i] == NULL) {
745                         continue;
746                 }
747
748                 offset = i * PAGESIZE;
749                 length = PAGESIZE;
750                 if (i == tdb->transaction->num_blocks-1) {
751                         length = tdb->transaction->last_block_size;
752                 }
753
754                 if (offset >= tdb->transaction->old_map_size) {
755                         continue;
756                 }
757
758                 if (offset + length > tdb->file->map_size) {
759                         ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
760                                            "tdb_transaction_setup_recovery:"
761                                            " transaction data over new region"
762                                            " boundary");
763                         goto fail;
764                 }
765                 if (offset + length > tdb->transaction->old_map_size) {
766                         /* Short read at EOF. */
767                         length = tdb->transaction->old_map_size - offset;
768                 }
769                 buffer = tdb_access_read(tdb, offset, length, false);
770                 if (TDB_PTR_IS_ERR(buffer)) {
771                         ecode = TDB_PTR_ERR(buffer);
772                         goto fail;
773                 }
774
775                 /* Skip over anything the same at the start. */
776                 off = same(tdb->transaction->blocks[i], buffer, length);
777                 offset += off;
778
779                 while (off < length) {
780                         tdb_len_t len;
781                         unsigned int samelen;
782
783                         len = different(tdb->transaction->blocks[i] + off,
784                                         buffer + off, length - off,
785                                         sizeof(offset) + sizeof(len) + 1,
786                                         &samelen);
787
788                         memcpy(p, &offset, sizeof(offset));
789                         memcpy(p + sizeof(offset), &len, sizeof(len));
790                         tdb_convert(tdb, p, sizeof(offset) + sizeof(len));
791                         p += sizeof(offset) + sizeof(len);
792                         memcpy(p, buffer + off, len);
793                         p += len;
794                         off += len + samelen;
795                         offset += len + samelen;
796                 }
797                 tdb_access_release(tdb, buffer);
798         }
799
800         *len = p - (unsigned char *)(rec + 1);
801         tdb->methods = old_methods;
802         return rec;
803
804 fail:
805         free(rec);
806         tdb->methods = old_methods;
807         return TDB_ERR_PTR(ecode);
808 }
809
810 static tdb_off_t create_recovery_area(struct tdb_context *tdb,
811                                       tdb_len_t rec_length,
812                                       struct tdb_recovery_record *rec)
813 {
814         tdb_off_t off, recovery_off;
815         tdb_len_t addition;
816         enum TDB_ERROR ecode;
817         const struct tdb_methods *methods = tdb->transaction->io_methods;
818
819         /* round up to a multiple of page size. Overallocate, since each
820          * such allocation forces us to expand the file. */
821         rec->max_len
822                 = (((sizeof(*rec) + rec_length + rec_length / 2)
823                     + PAGESIZE-1) & ~(PAGESIZE-1))
824                 - sizeof(*rec);
825         off = tdb->file->map_size;
826
827         /* Restore ->map_size before calling underlying expand_file.
828            Also so that we don't try to expand the file again in the
829            transaction commit, which would destroy the recovery
830            area */
831         addition = (tdb->file->map_size - tdb->transaction->old_map_size) +
832                 sizeof(*rec) + rec->max_len;
833         tdb->file->map_size = tdb->transaction->old_map_size;
834         tdb->stats.transaction_expand_file++;
835         ecode = methods->expand_file(tdb, addition);
836         if (ecode != TDB_SUCCESS) {
837                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
838                                   "tdb_recovery_allocate:"
839                                   " failed to create recovery area");
840         }
841
842         /* we have to reset the old map size so that we don't try to
843            expand the file again in the transaction commit, which
844            would destroy the recovery area */
845         tdb->transaction->old_map_size = tdb->file->map_size;
846
847         /* write the recovery header offset and sync - we can sync without a race here
848            as the magic ptr in the recovery record has not been set */
849         recovery_off = off;
850         tdb_convert(tdb, &recovery_off, sizeof(recovery_off));
851         ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery),
852                                 &recovery_off, sizeof(tdb_off_t));
853         if (ecode != TDB_SUCCESS) {
854                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
855                                   "tdb_recovery_allocate:"
856                                   " failed to write recovery head");
857         }
858         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
859                                    &recovery_off,
860                                    sizeof(tdb_off_t));
861         return off;
862 }
863
864 /*
865   setup the recovery data that will be used on a crash during commit
866 */
867 static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb)
868 {
869         tdb_len_t recovery_size = 0;
870         tdb_off_t recovery_off = 0;
871         tdb_off_t old_map_size = tdb->transaction->old_map_size;
872         struct tdb_recovery_record *recovery;
873         const struct tdb_methods *methods = tdb->transaction->io_methods;
874         uint64_t magic;
875         enum TDB_ERROR ecode;
876
877         recovery = alloc_recovery(tdb, &recovery_size);
878         if (TDB_PTR_IS_ERR(recovery))
879                 return TDB_PTR_ERR(recovery);
880
881         ecode = tdb_recovery_area(tdb, methods, &recovery_off, recovery);
882         if (ecode) {
883                 free(recovery);
884                 return ecode;
885         }
886
887         if (recovery->max_len < recovery_size) {
888                 /* Not large enough. Free up old recovery area. */
889                 if (recovery_off) {
890                         tdb->stats.frees++;
891                         ecode = add_free_record(tdb, recovery_off,
892                                                 sizeof(*recovery)
893                                                 + recovery->max_len,
894                                                 TDB_LOCK_WAIT, true);
895                         free(recovery);
896                         if (ecode != TDB_SUCCESS) {
897                                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
898                                                   "tdb_recovery_allocate:"
899                                                   " failed to free previous"
900                                                   " recovery area");
901                         }
902
903                         /* Refresh recovery after add_free_record above. */
904                         recovery = alloc_recovery(tdb, &recovery_size);
905                         if (TDB_PTR_IS_ERR(recovery))
906                                 return TDB_PTR_ERR(recovery);
907                 }
908
909                 recovery_off = create_recovery_area(tdb, recovery_size,
910                                                     recovery);
911                 if (TDB_OFF_IS_ERR(recovery_off)) {
912                         free(recovery);
913                         return recovery_off;
914                 }
915         }
916
917         /* Now we know size, convert rec header. */
918         recovery->magic = TDB_RECOVERY_INVALID_MAGIC;
919         recovery->len = recovery_size;
920         recovery->eof = old_map_size;
921         tdb_convert(tdb, recovery, sizeof(*recovery));
922
923         /* write the recovery data to the recovery area */
924         ecode = methods->twrite(tdb, recovery_off, recovery, recovery_size);
925         if (ecode != TDB_SUCCESS) {
926                 free(recovery);
927                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
928                                   "tdb_transaction_setup_recovery:"
929                                   " failed to write recovery data");
930         }
931         transaction_write_existing(tdb, recovery_off, recovery, recovery_size);
932
933         free(recovery);
934
935         /* as we don't have ordered writes, we have to sync the recovery
936            data before we update the magic to indicate that the recovery
937            data is present */
938         ecode = transaction_sync(tdb, recovery_off, recovery_size);
939         if (ecode != TDB_SUCCESS)
940                 return ecode;
941
942         magic = TDB_RECOVERY_MAGIC;
943         tdb_convert(tdb, &magic, sizeof(magic));
944
945         tdb->transaction->magic_offset
946                 = recovery_off + offsetof(struct tdb_recovery_record, magic);
947
948         ecode = methods->twrite(tdb, tdb->transaction->magic_offset,
949                                 &magic, sizeof(magic));
950         if (ecode != TDB_SUCCESS) {
951                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
952                                   "tdb_transaction_setup_recovery:"
953                                   " failed to write recovery magic");
954         }
955         transaction_write_existing(tdb, tdb->transaction->magic_offset,
956                                    &magic, sizeof(magic));
957
958         /* ensure the recovery magic marker is on disk */
959         return transaction_sync(tdb, tdb->transaction->magic_offset,
960                                 sizeof(magic));
961 }
962
963 static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
964 {
965         const struct tdb_methods *methods;
966         enum TDB_ERROR ecode;
967
968         if (tdb->transaction == NULL) {
969                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
970                                   "tdb_transaction_prepare_commit:"
971                                   " no transaction");
972         }
973
974         if (tdb->transaction->prepared) {
975                 _tdb_transaction_cancel(tdb);
976                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
977                                   "tdb_transaction_prepare_commit:"
978                                   " transaction already prepared");
979         }
980
981         if (tdb->transaction->transaction_error) {
982                 _tdb_transaction_cancel(tdb);
983                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
984                                   "tdb_transaction_prepare_commit:"
985                                   " transaction error pending");
986         }
987
988
989         if (tdb->transaction->nesting != 0) {
990                 return TDB_SUCCESS;
991         }
992
993         /* check for a null transaction */
994         if (tdb->transaction->blocks == NULL) {
995                 return TDB_SUCCESS;
996         }
997
998         methods = tdb->transaction->io_methods;
999
1000         /* upgrade the main transaction lock region to a write lock */
1001         ecode = tdb_allrecord_upgrade(tdb);
1002         if (ecode != TDB_SUCCESS) {
1003                 return ecode;
1004         }
1005
1006         /* get the open lock - this prevents new users attaching to the database
1007            during the commit */
1008         ecode = tdb_lock_open(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
1009         if (ecode != TDB_SUCCESS) {
1010                 return ecode;
1011         }
1012
1013         /* Since we have whole db locked, we don't need the expansion lock. */
1014         if (!(tdb->flags & TDB_NOSYNC)) {
1015                 /* Sets up tdb->transaction->recovery and
1016                  * tdb->transaction->magic_offset. */
1017                 ecode = transaction_setup_recovery(tdb);
1018                 if (ecode != TDB_SUCCESS) {
1019                         return ecode;
1020                 }
1021         }
1022
1023         tdb->transaction->prepared = true;
1024
1025         /* expand the file to the new size if needed */
1026         if (tdb->file->map_size != tdb->transaction->old_map_size) {
1027                 tdb_len_t add;
1028
1029                 add = tdb->file->map_size - tdb->transaction->old_map_size;
1030                 /* Restore original map size for tdb_expand_file */
1031                 tdb->file->map_size = tdb->transaction->old_map_size;
1032                 ecode = methods->expand_file(tdb, add);
1033                 if (ecode != TDB_SUCCESS) {
1034                         return ecode;
1035                 }
1036         }
1037
1038         /* Keep the open lock until the actual commit */
1039         return TDB_SUCCESS;
1040 }
1041
1042 /*
1043    prepare to commit the current transaction
1044 */
1045 enum TDB_ERROR tdb_transaction_prepare_commit(struct tdb_context *tdb)
1046 {
1047         return _tdb_transaction_prepare_commit(tdb);
1048 }
1049
1050 /*
1051   commit the current transaction
1052 */
1053 enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb)
1054 {
1055         const struct tdb_methods *methods;
1056         int i;
1057         enum TDB_ERROR ecode;
1058
1059         if (tdb->transaction == NULL) {
1060                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
1061                                                     TDB_LOG_USE_ERROR,
1062                                                     "tdb_transaction_commit:"
1063                                                     " no transaction");
1064         }
1065
1066         tdb_trace(tdb, "tdb_transaction_commit");
1067
1068         if (tdb->transaction->nesting != 0) {
1069                 tdb->transaction->nesting--;
1070                 return tdb->last_error = TDB_SUCCESS;
1071         }
1072
1073         /* check for a null transaction */
1074         if (tdb->transaction->blocks == NULL) {
1075                 _tdb_transaction_cancel(tdb);
1076                 return tdb->last_error = TDB_SUCCESS;
1077         }
1078
1079         if (!tdb->transaction->prepared) {
1080                 ecode = _tdb_transaction_prepare_commit(tdb);
1081                 if (ecode != TDB_SUCCESS) {
1082                         _tdb_transaction_cancel(tdb);
1083                         return tdb->last_error = ecode;
1084                 }
1085         }
1086
1087         methods = tdb->transaction->io_methods;
1088
1089         /* perform all the writes */
1090         for (i=0;i<tdb->transaction->num_blocks;i++) {
1091                 tdb_off_t offset;
1092                 tdb_len_t length;
1093
1094                 if (tdb->transaction->blocks[i] == NULL) {
1095                         continue;
1096                 }
1097
1098                 offset = i * PAGESIZE;
1099                 length = PAGESIZE;
1100                 if (i == tdb->transaction->num_blocks-1) {
1101                         length = tdb->transaction->last_block_size;
1102                 }
1103
1104                 ecode = methods->twrite(tdb, offset,
1105                                         tdb->transaction->blocks[i], length);
1106                 if (ecode != TDB_SUCCESS) {
1107                         /* we've overwritten part of the data and
1108                            possibly expanded the file, so we need to
1109                            run the crash recovery code */
1110                         tdb->methods = methods;
1111                         tdb_transaction_recover(tdb);
1112
1113                         _tdb_transaction_cancel(tdb);
1114
1115                         return tdb->last_error = ecode;
1116                 }
1117                 SAFE_FREE(tdb->transaction->blocks[i]);
1118         }
1119
1120         SAFE_FREE(tdb->transaction->blocks);
1121         tdb->transaction->num_blocks = 0;
1122
1123         /* ensure the new data is on disk */
1124         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1125         if (ecode != TDB_SUCCESS) {
1126                 return tdb->last_error = ecode;
1127         }
1128
1129         /*
1130           TODO: maybe write to some dummy hdr field, or write to magic
1131           offset without mmap, before the last sync, instead of the
1132           utime() call
1133         */
1134
1135         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1136            don't change the mtime of the file, this means the file may
1137            not be backed up (as tdb rounding to block sizes means that
1138            file size changes are quite rare too). The following forces
1139            mtime changes when a transaction completes */
1140 #if HAVE_UTIME
1141         utime(tdb->name, NULL);
1142 #endif
1143
1144         /* use a transaction cancel to free memory and remove the
1145            transaction locks: it "restores" map_size, too. */
1146         tdb->transaction->old_map_size = tdb->file->map_size;
1147         _tdb_transaction_cancel(tdb);
1148
1149         return tdb->last_error = TDB_SUCCESS;
1150 }
1151
1152
1153 /*
1154   recover from an aborted transaction. Must be called with exclusive
1155   database write access already established (including the open
1156   lock to prevent new processes attaching)
1157 */
1158 enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb)
1159 {
1160         tdb_off_t recovery_head, recovery_eof;
1161         unsigned char *data, *p;
1162         struct tdb_recovery_record rec;
1163         enum TDB_ERROR ecode;
1164
1165         /* find the recovery area */
1166         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1167         if (TDB_OFF_IS_ERR(recovery_head)) {
1168                 return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
1169                                   "tdb_transaction_recover:"
1170                                   " failed to read recovery head");
1171         }
1172
1173         if (recovery_head == 0) {
1174                 /* we have never allocated a recovery record */
1175                 return TDB_SUCCESS;
1176         }
1177
1178         /* read the recovery record */
1179         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1180         if (ecode != TDB_SUCCESS) {
1181                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1182                                   "tdb_transaction_recover:"
1183                                   " failed to read recovery record");
1184         }
1185
1186         if (rec.magic != TDB_RECOVERY_MAGIC) {
1187                 /* there is no valid recovery data */
1188                 return TDB_SUCCESS;
1189         }
1190
1191         if (tdb->read_only) {
1192                 return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1193                                   "tdb_transaction_recover:"
1194                                   " attempt to recover read only database");
1195         }
1196
1197         recovery_eof = rec.eof;
1198
1199         data = (unsigned char *)malloc(rec.len);
1200         if (data == NULL) {
1201                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1202                                   "tdb_transaction_recover:"
1203                                   " failed to allocate recovery data");
1204         }
1205
1206         /* read the full recovery data */
1207         ecode = tdb->methods->tread(tdb, recovery_head + sizeof(rec), data,
1208                                     rec.len);
1209         if (ecode != TDB_SUCCESS) {
1210                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1211                                   "tdb_transaction_recover:"
1212                                   " failed to read recovery data");
1213         }
1214
1215         /* recover the file data */
1216         p = data;
1217         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1218                 tdb_off_t ofs;
1219                 tdb_len_t len;
1220                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1221                 memcpy(&ofs, p, sizeof(ofs));
1222                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1223                 p += sizeof(ofs) + sizeof(len);
1224
1225                 ecode = tdb->methods->twrite(tdb, ofs, p, len);
1226                 if (ecode != TDB_SUCCESS) {
1227                         free(data);
1228                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1229                                           "tdb_transaction_recover:"
1230                                           " failed to recover %zu bytes"
1231                                           " at offset %zu",
1232                                           (size_t)len, (size_t)ofs);
1233                 }
1234                 p += len;
1235         }
1236
1237         free(data);
1238
1239         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1240         if (ecode != TDB_SUCCESS) {
1241                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1242                                   "tdb_transaction_recover:"
1243                                   " failed to sync recovery");
1244         }
1245
1246         /* if the recovery area is after the recovered eof then remove it */
1247         if (recovery_eof <= recovery_head) {
1248                 ecode = tdb_write_off(tdb, offsetof(struct tdb_header,
1249                                                     recovery),
1250                                       0);
1251                 if (ecode != TDB_SUCCESS) {
1252                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1253                                           "tdb_transaction_recover:"
1254                                           " failed to remove recovery head");
1255                 }
1256         }
1257
1258         /* remove the recovery magic */
1259         ecode = tdb_write_off(tdb,
1260                               recovery_head
1261                               + offsetof(struct tdb_recovery_record, magic),
1262                               TDB_RECOVERY_INVALID_MAGIC);
1263         if (ecode != TDB_SUCCESS) {
1264                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1265                                   "tdb_transaction_recover:"
1266                                   " failed to remove recovery magic");
1267         }
1268
1269         ecode = transaction_sync(tdb, 0, recovery_eof);
1270         if (ecode != TDB_SUCCESS) {
1271                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1272                                   "tdb_transaction_recover:"
1273                                   " failed to sync2 recovery");
1274         }
1275
1276         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1277                    "tdb_transaction_recover: recovered %zu byte database",
1278                    (size_t)recovery_eof);
1279
1280         /* all done */
1281         return TDB_SUCCESS;
1282 }
1283
1284 tdb_bool_err tdb_needs_recovery(struct tdb_context *tdb)
1285 {
1286         tdb_off_t recovery_head;
1287         struct tdb_recovery_record rec;
1288         enum TDB_ERROR ecode;
1289
1290         /* find the recovery area */
1291         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1292         if (TDB_OFF_IS_ERR(recovery_head)) {
1293                 return recovery_head;
1294         }
1295
1296         if (recovery_head == 0) {
1297                 /* we have never allocated a recovery record */
1298                 return false;
1299         }
1300
1301         /* read the recovery record */
1302         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1303         if (ecode != TDB_SUCCESS) {
1304                 return ecode;
1305         }
1306
1307         return (rec.magic == TDB_RECOVERY_MAGIC);
1308 }