]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
tdb2: cleanup oob handling.
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in POSIX locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is canceled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or canceled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* the original io methods - used to do IOs to the real db */
96         const struct tdb_methods *io_methods;
97
98         /* the list of transaction blocks. When a block is first
99            written to, it gets created in this list */
100         uint8_t **blocks;
101         size_t num_blocks;
102         size_t last_block_size; /* number of valid bytes in the last block */
103
104         /* non-zero when an internal transaction error has
105            occurred. All write operations will then fail until the
106            transaction is ended */
107         int transaction_error;
108
109         /* when inside a transaction we need to keep track of any
110            nested tdb_transaction_start() calls, as these are allowed,
111            but don't create a new transaction */
112         unsigned int nesting;
113
114         /* set when a prepare has already occurred */
115         bool prepared;
116         tdb_off_t magic_offset;
117
118         /* old file size before transaction */
119         tdb_len_t old_map_size;
120 };
121
122 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
123 #define PAGESIZE 65536
124
125 /*
126   read while in a transaction. We need to check first if the data is in our list
127   of transaction elements, then if not do a real read
128 */
129 static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off,
130                                        void *buf, tdb_len_t len)
131 {
132         size_t blk;
133         enum TDB_ERROR ecode;
134
135         /* break it down into block sized ops */
136         while (len + (off % PAGESIZE) > PAGESIZE) {
137                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
138                 ecode = transaction_read(tdb, off, buf, len2);
139                 if (ecode != TDB_SUCCESS) {
140                         return ecode;
141                 }
142                 len -= len2;
143                 off += len2;
144                 buf = (void *)(len2 + (char *)buf);
145         }
146
147         if (len == 0) {
148                 return TDB_SUCCESS;
149         }
150
151         blk = off / PAGESIZE;
152
153         /* see if we have it in the block list */
154         if (tdb->transaction->num_blocks <= blk ||
155             tdb->transaction->blocks[blk] == NULL) {
156                 /* nope, do a real read */
157                 ecode = tdb->transaction->io_methods->tread(tdb, off, buf, len);
158                 if (ecode != TDB_SUCCESS) {
159                         goto fail;
160                 }
161                 return 0;
162         }
163
164         /* it is in the block list. Now check for the last block */
165         if (blk == tdb->transaction->num_blocks-1) {
166                 if (len > tdb->transaction->last_block_size) {
167                         ecode = TDB_ERR_IO;
168                         goto fail;
169                 }
170         }
171
172         /* now copy it out of this block */
173         memcpy(buf, tdb->transaction->blocks[blk] + (off % PAGESIZE), len);
174         return TDB_SUCCESS;
175
176 fail:
177         tdb->transaction->transaction_error = 1;
178         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
179                           "transaction_read: failed at off=%zu len=%zu",
180                           (size_t)off, (size_t)len);
181 }
182
183
184 /*
185   write while in a transaction
186 */
187 static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off,
188                                         const void *buf, tdb_len_t len)
189 {
190         size_t blk;
191         enum TDB_ERROR ecode;
192
193         /* Only a commit is allowed on a prepared transaction */
194         if (tdb->transaction->prepared) {
195                 ecode = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
196                                    "transaction_write: transaction already"
197                                    " prepared, write not allowed");
198                 goto fail;
199         }
200
201         /* break it up into block sized chunks */
202         while (len + (off % PAGESIZE) > PAGESIZE) {
203                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
204                 ecode = transaction_write(tdb, off, buf, len2);
205                 if (ecode != TDB_SUCCESS) {
206                         return -1;
207                 }
208                 len -= len2;
209                 off += len2;
210                 if (buf != NULL) {
211                         buf = (const void *)(len2 + (const char *)buf);
212                 }
213         }
214
215         if (len == 0) {
216                 return TDB_SUCCESS;
217         }
218
219         blk = off / PAGESIZE;
220         off = off % PAGESIZE;
221
222         if (tdb->transaction->num_blocks <= blk) {
223                 uint8_t **new_blocks;
224                 /* expand the blocks array */
225                 if (tdb->transaction->blocks == NULL) {
226                         new_blocks = (uint8_t **)malloc(
227                                 (blk+1)*sizeof(uint8_t *));
228                 } else {
229                         new_blocks = (uint8_t **)realloc(
230                                 tdb->transaction->blocks,
231                                 (blk+1)*sizeof(uint8_t *));
232                 }
233                 if (new_blocks == NULL) {
234                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
235                                            "transaction_write:"
236                                            " failed to allocate");
237                         goto fail;
238                 }
239                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
240                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
241                 tdb->transaction->blocks = new_blocks;
242                 tdb->transaction->num_blocks = blk+1;
243                 tdb->transaction->last_block_size = 0;
244         }
245
246         /* allocate and fill a block? */
247         if (tdb->transaction->blocks[blk] == NULL) {
248                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1);
249                 if (tdb->transaction->blocks[blk] == NULL) {
250                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
251                                            "transaction_write:"
252                                            " failed to allocate");
253                         goto fail;
254                 }
255                 if (tdb->transaction->old_map_size > blk * PAGESIZE) {
256                         tdb_len_t len2 = PAGESIZE;
257                         if (len2 + (blk * PAGESIZE) > tdb->transaction->old_map_size) {
258                                 len2 = tdb->transaction->old_map_size - (blk * PAGESIZE);
259                         }
260                         ecode = tdb->transaction->io_methods->tread(tdb,
261                                         blk * PAGESIZE,
262                                         tdb->transaction->blocks[blk],
263                                         len2);
264                         if (ecode != TDB_SUCCESS) {
265                                 ecode = tdb_logerr(tdb, ecode,
266                                                    TDB_LOG_ERROR,
267                                                    "transaction_write:"
268                                                    " failed to"
269                                                    " read old block: %s",
270                                                    strerror(errno));
271                                 SAFE_FREE(tdb->transaction->blocks[blk]);
272                                 goto fail;
273                         }
274                         if (blk == tdb->transaction->num_blocks-1) {
275                                 tdb->transaction->last_block_size = len2;
276                         }
277                 }
278         }
279
280         /* overwrite part of an existing block */
281         if (buf == NULL) {
282                 memset(tdb->transaction->blocks[blk] + off, 0, len);
283         } else {
284                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
285         }
286         if (blk == tdb->transaction->num_blocks-1) {
287                 if (len + off > tdb->transaction->last_block_size) {
288                         tdb->transaction->last_block_size = len + off;
289                 }
290         }
291
292         return TDB_SUCCESS;
293
294 fail:
295         tdb->transaction->transaction_error = 1;
296         return ecode;
297 }
298
299
300 /*
301   write while in a transaction - this variant never expands the transaction blocks, it only
302   updates existing blocks. This means it cannot change the recovery size
303 */
304 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
305                                        const void *buf, tdb_len_t len)
306 {
307         size_t blk;
308
309         /* break it up into block sized chunks */
310         while (len + (off % PAGESIZE) > PAGESIZE) {
311                 tdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
312                 transaction_write_existing(tdb, off, buf, len2);
313                 len -= len2;
314                 off += len2;
315                 if (buf != NULL) {
316                         buf = (const void *)(len2 + (const char *)buf);
317                 }
318         }
319
320         if (len == 0) {
321                 return;
322         }
323
324         blk = off / PAGESIZE;
325         off = off % PAGESIZE;
326
327         if (tdb->transaction->num_blocks <= blk ||
328             tdb->transaction->blocks[blk] == NULL) {
329                 return;
330         }
331
332         if (blk == tdb->transaction->num_blocks-1 &&
333             off + len > tdb->transaction->last_block_size) {
334                 if (off >= tdb->transaction->last_block_size) {
335                         return;
336                 }
337                 len = tdb->transaction->last_block_size - off;
338         }
339
340         /* overwrite part of an existing block */
341         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
342 }
343
344
345 /*
346   out of bounds check during a transaction
347 */
348 static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len,
349                                       bool probe)
350 {
351         if (len <= tdb->file->map_size || probe) {
352                 return TDB_SUCCESS;
353         }
354
355         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
356                    "tdb_oob len %lld beyond transaction size %lld",
357                    (long long)len,
358                    (long long)tdb->file->map_size);
359         return TDB_ERR_IO;
360 }
361
362 /*
363   transaction version of tdb_expand().
364 */
365 static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb,
366                                               tdb_off_t addition)
367 {
368         enum TDB_ERROR ecode;
369
370         /* add a write to the transaction elements, so subsequent
371            reads see the zero data */
372         ecode = transaction_write(tdb, tdb->file->map_size, NULL, addition);
373         if (ecode == TDB_SUCCESS) {
374                 tdb->file->map_size += addition;
375         }
376         return ecode;
377 }
378
379 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
380                                 size_t len, bool write_mode)
381 {
382         size_t blk = off / PAGESIZE, end_blk;
383
384         /* This is wrong for zero-length blocks, but will fail gracefully */
385         end_blk = (off + len - 1) / PAGESIZE;
386
387         /* Can only do direct if in single block and we've already copied. */
388         if (write_mode) {
389                 tdb->stats.transaction_write_direct++;
390                 if (blk != end_blk
391                     || blk >= tdb->transaction->num_blocks
392                     || tdb->transaction->blocks[blk] == NULL) {
393                         tdb->stats.transaction_write_direct_fail++;
394                         return NULL;
395                 }
396                 return tdb->transaction->blocks[blk] + off % PAGESIZE;
397         }
398
399         tdb->stats.transaction_read_direct++;
400         /* Single which we have copied? */
401         if (blk == end_blk
402             && blk < tdb->transaction->num_blocks
403             && tdb->transaction->blocks[blk])
404                 return tdb->transaction->blocks[blk] + off % PAGESIZE;
405
406         /* Otherwise must be all not copied. */
407         while (blk <= end_blk) {
408                 if (blk >= tdb->transaction->num_blocks)
409                         break;
410                 if (tdb->transaction->blocks[blk]) {
411                         tdb->stats.transaction_read_direct_fail++;
412                         return NULL;
413                 }
414                 blk++;
415         }
416         return tdb->transaction->io_methods->direct(tdb, off, len, false);
417 }
418
419 static const struct tdb_methods transaction_methods = {
420         transaction_read,
421         transaction_write,
422         transaction_oob,
423         transaction_expand_file,
424         transaction_direct,
425 };
426
427 /*
428   sync to disk
429 */
430 static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
431                                        tdb_off_t offset, tdb_len_t length)
432 {
433         if (tdb->flags & TDB_NOSYNC) {
434                 return TDB_SUCCESS;
435         }
436
437         if (fsync(tdb->file->fd) != 0) {
438                 return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
439                                   "tdb_transaction: fsync failed: %s",
440                                   strerror(errno));
441         }
442 #ifdef MS_SYNC
443         if (tdb->file->map_ptr) {
444                 tdb_off_t moffset = offset & ~(getpagesize()-1);
445                 if (msync(moffset + (char *)tdb->file->map_ptr,
446                           length + (offset - moffset), MS_SYNC) != 0) {
447                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
448                                           "tdb_transaction: msync failed: %s",
449                                           strerror(errno));
450                 }
451         }
452 #endif
453         return TDB_SUCCESS;
454 }
455
456
457 static void _tdb_transaction_cancel(struct tdb_context *tdb)
458 {
459         int i;
460         enum TDB_ERROR ecode;
461
462         if (tdb->transaction == NULL) {
463                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
464                            "tdb_transaction_cancel: no transaction");
465                 return;
466         }
467
468         if (tdb->transaction->nesting != 0) {
469                 tdb->transaction->transaction_error = 1;
470                 tdb->transaction->nesting--;
471                 return;
472         }
473
474         tdb->file->map_size = tdb->transaction->old_map_size;
475
476         /* free all the transaction blocks */
477         for (i=0;i<tdb->transaction->num_blocks;i++) {
478                 if (tdb->transaction->blocks[i] != NULL) {
479                         free(tdb->transaction->blocks[i]);
480                 }
481         }
482         SAFE_FREE(tdb->transaction->blocks);
483
484         if (tdb->transaction->magic_offset) {
485                 const struct tdb_methods *methods = tdb->transaction->io_methods;
486                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
487
488                 /* remove the recovery marker */
489                 ecode = methods->twrite(tdb, tdb->transaction->magic_offset,
490                                         &invalid, sizeof(invalid));
491                 if (ecode == TDB_SUCCESS)
492                         ecode = transaction_sync(tdb,
493                                                  tdb->transaction->magic_offset,
494                                                  sizeof(invalid));
495                 if (ecode != TDB_SUCCESS) {
496                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
497                                    "tdb_transaction_cancel: failed to remove"
498                                    " recovery magic");
499                 }
500         }
501
502         if (tdb->file->allrecord_lock.count)
503                 tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
504
505         /* restore the normal io methods */
506         tdb->methods = tdb->transaction->io_methods;
507
508         tdb_transaction_unlock(tdb, F_WRLCK);
509
510         if (tdb_has_open_lock(tdb))
511                 tdb_unlock_open(tdb, F_WRLCK);
512
513         SAFE_FREE(tdb->transaction);
514 }
515
516 /*
517   start a tdb transaction. No token is returned, as only a single
518   transaction is allowed to be pending per tdb_context
519 */
520 enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
521 {
522         enum TDB_ERROR ecode;
523
524         tdb->stats.transactions++;
525         /* some sanity checks */
526         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
527                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
528                                                     TDB_LOG_USE_ERROR,
529                                                     "tdb_transaction_start:"
530                                                     " cannot start a"
531                                                     " transaction on a "
532                                                     "read-only or internal db");
533         }
534
535         /* cope with nested tdb_transaction_start() calls */
536         if (tdb->transaction != NULL) {
537                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
538                         return tdb->last_error
539                                 = tdb_logerr(tdb, TDB_ERR_IO,
540                                              TDB_LOG_USE_ERROR,
541                                              "tdb_transaction_start:"
542                                              " already inside transaction");
543                 }
544                 tdb->transaction->nesting++;
545                 tdb->stats.transaction_nest++;
546                 return 0;
547         }
548
549         if (tdb_has_hash_locks(tdb)) {
550                 /* the caller must not have any locks when starting a
551                    transaction as otherwise we'll be screwed by lack
552                    of nested locks in POSIX */
553                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK,
554                                                     TDB_LOG_USE_ERROR,
555                                                     "tdb_transaction_start:"
556                                                     " cannot start a"
557                                                     " transaction with locks"
558                                                     " held");
559         }
560
561         tdb->transaction = (struct tdb_transaction *)
562                 calloc(sizeof(struct tdb_transaction), 1);
563         if (tdb->transaction == NULL) {
564                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM,
565                                                     TDB_LOG_ERROR,
566                                                     "tdb_transaction_start:"
567                                                     " cannot allocate");
568         }
569
570         /* get the transaction write lock. This is a blocking lock. As
571            discussed with Volker, there are a number of ways we could
572            make this async, which we will probably do in the future */
573         ecode = tdb_transaction_lock(tdb, F_WRLCK);
574         if (ecode != TDB_SUCCESS) {
575                 SAFE_FREE(tdb->transaction->blocks);
576                 SAFE_FREE(tdb->transaction);
577                 return tdb->last_error = ecode;
578         }
579
580         /* get a read lock over entire file. This is upgraded to a write
581            lock during the commit */
582         ecode = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true);
583         if (ecode != TDB_SUCCESS) {
584                 goto fail_allrecord_lock;
585         }
586
587         /* make sure we know about any file expansions already done by
588            anyone else */
589         tdb->methods->oob(tdb, tdb->file->map_size + 1, true);
590         tdb->transaction->old_map_size = tdb->file->map_size;
591
592         /* finally hook the io methods, replacing them with
593            transaction specific methods */
594         tdb->transaction->io_methods = tdb->methods;
595         tdb->methods = &transaction_methods;
596         return tdb->last_error = TDB_SUCCESS;
597
598 fail_allrecord_lock:
599         tdb_transaction_unlock(tdb, F_WRLCK);
600         SAFE_FREE(tdb->transaction->blocks);
601         SAFE_FREE(tdb->transaction);
602         return tdb->last_error = ecode;
603 }
604
605
606 /*
607   cancel the current transaction
608 */
609 void tdb_transaction_cancel(struct tdb_context *tdb)
610 {
611         tdb->stats.transaction_cancel++;
612         _tdb_transaction_cancel(tdb);
613 }
614
615 /*
616   work out how much space the linearised recovery data will consume (worst case)
617 */
618 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
619 {
620         tdb_len_t recovery_size = 0;
621         int i;
622
623         recovery_size = 0;
624         for (i=0;i<tdb->transaction->num_blocks;i++) {
625                 if (i * PAGESIZE >= tdb->transaction->old_map_size) {
626                         break;
627                 }
628                 if (tdb->transaction->blocks[i] == NULL) {
629                         continue;
630                 }
631                 recovery_size += 2*sizeof(tdb_off_t);
632                 if (i == tdb->transaction->num_blocks-1) {
633                         recovery_size += tdb->transaction->last_block_size;
634                 } else {
635                         recovery_size += PAGESIZE;
636                 }
637         }
638
639         return recovery_size;
640 }
641
642 static enum TDB_ERROR tdb_recovery_area(struct tdb_context *tdb,
643                                         const struct tdb_methods *methods,
644                                         tdb_off_t *recovery_offset,
645                                         struct tdb_recovery_record *rec)
646 {
647         enum TDB_ERROR ecode;
648
649         *recovery_offset = tdb_read_off(tdb,
650                                         offsetof(struct tdb_header, recovery));
651         if (TDB_OFF_IS_ERR(*recovery_offset)) {
652                 return *recovery_offset;
653         }
654
655         if (*recovery_offset == 0) {
656                 rec->max_len = 0;
657                 return TDB_SUCCESS;
658         }
659
660         ecode = methods->tread(tdb, *recovery_offset, rec, sizeof(*rec));
661         if (ecode != TDB_SUCCESS)
662                 return ecode;
663
664         tdb_convert(tdb, rec, sizeof(*rec));
665         /* ignore invalid recovery regions: can happen in crash */
666         if (rec->magic != TDB_RECOVERY_MAGIC &&
667             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
668                 *recovery_offset = 0;
669                 rec->max_len = 0;
670         }
671         return TDB_SUCCESS;
672 }
673
674 static unsigned int same(const unsigned char *new,
675                          const unsigned char *old,
676                          unsigned int length)
677 {
678         unsigned int i;
679
680         for (i = 0; i < length; i++) {
681                 if (new[i] != old[i])
682                         break;
683         }
684         return i;
685 }
686
687 static unsigned int different(const unsigned char *new,
688                               const unsigned char *old,
689                               unsigned int length,
690                               unsigned int min_same,
691                               unsigned int *samelen)
692 {
693         unsigned int i;
694
695         *samelen = 0;
696         for (i = 0; i < length; i++) {
697                 if (new[i] == old[i]) {
698                         (*samelen)++;
699                 } else {
700                         if (*samelen >= min_same) {
701                                 return i - *samelen;
702                         }
703                         *samelen = 0;
704                 }
705         }
706
707         if (*samelen < min_same)
708                 *samelen = 0;
709         return length - *samelen;
710 }
711
712 /* Allocates recovery blob, without tdb_recovery_record at head set up. */
713 static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
714                                                   tdb_len_t *len)
715 {
716         struct tdb_recovery_record *rec;
717         size_t i;
718         enum TDB_ERROR ecode;
719         unsigned char *p;
720         const struct tdb_methods *old_methods = tdb->methods;
721
722         rec = malloc(sizeof(*rec) + tdb_recovery_size(tdb));
723         if (!rec) {
724                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
725                            "transaction_setup_recovery:"
726                            " cannot allocate");
727                 return TDB_ERR_PTR(TDB_ERR_OOM);
728         }
729
730         /* We temporarily revert to the old I/O methods, so we can use
731          * tdb_access_read */
732         tdb->methods = tdb->transaction->io_methods;
733
734         /* build the recovery data into a single blob to allow us to do a single
735            large write, which should be more efficient */
736         p = (unsigned char *)(rec + 1);
737         for (i=0;i<tdb->transaction->num_blocks;i++) {
738                 tdb_off_t offset;
739                 tdb_len_t length;
740                 unsigned int off;
741                 const unsigned char *buffer;
742
743                 if (tdb->transaction->blocks[i] == NULL) {
744                         continue;
745                 }
746
747                 offset = i * PAGESIZE;
748                 length = PAGESIZE;
749                 if (i == tdb->transaction->num_blocks-1) {
750                         length = tdb->transaction->last_block_size;
751                 }
752
753                 if (offset >= tdb->transaction->old_map_size) {
754                         continue;
755                 }
756
757                 if (offset + length > tdb->file->map_size) {
758                         ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
759                                            "tdb_transaction_setup_recovery:"
760                                            " transaction data over new region"
761                                            " boundary");
762                         goto fail;
763                 }
764                 if (offset + length > tdb->transaction->old_map_size) {
765                         /* Short read at EOF. */
766                         length = tdb->transaction->old_map_size - offset;
767                 }
768                 buffer = tdb_access_read(tdb, offset, length, false);
769                 if (TDB_PTR_IS_ERR(buffer)) {
770                         ecode = TDB_PTR_ERR(buffer);
771                         goto fail;
772                 }
773
774                 /* Skip over anything the same at the start. */
775                 off = same(tdb->transaction->blocks[i], buffer, length);
776                 offset += off;
777
778                 while (off < length) {
779                         tdb_len_t len;
780                         unsigned int samelen;
781
782                         len = different(tdb->transaction->blocks[i] + off,
783                                         buffer + off, length - off,
784                                         sizeof(offset) + sizeof(len) + 1,
785                                         &samelen);
786
787                         memcpy(p, &offset, sizeof(offset));
788                         memcpy(p + sizeof(offset), &len, sizeof(len));
789                         tdb_convert(tdb, p, sizeof(offset) + sizeof(len));
790                         p += sizeof(offset) + sizeof(len);
791                         memcpy(p, buffer + off, len);
792                         p += len;
793                         off += len + samelen;
794                         offset += len + samelen;
795                 }
796                 tdb_access_release(tdb, buffer);
797         }
798
799         *len = p - (unsigned char *)(rec + 1);
800         tdb->methods = old_methods;
801         return rec;
802
803 fail:
804         free(rec);
805         tdb->methods = old_methods;
806         return TDB_ERR_PTR(ecode);
807 }
808
809 static tdb_off_t create_recovery_area(struct tdb_context *tdb,
810                                       tdb_len_t rec_length,
811                                       struct tdb_recovery_record *rec)
812 {
813         tdb_off_t off, recovery_off;
814         tdb_len_t addition;
815         enum TDB_ERROR ecode;
816         const struct tdb_methods *methods = tdb->transaction->io_methods;
817
818         /* round up to a multiple of page size. Overallocate, since each
819          * such allocation forces us to expand the file. */
820         rec->max_len
821                 = (((sizeof(*rec) + rec_length + rec_length / 2)
822                     + PAGESIZE-1) & ~(PAGESIZE-1))
823                 - sizeof(*rec);
824         off = tdb->file->map_size;
825
826         /* Restore ->map_size before calling underlying expand_file.
827            Also so that we don't try to expand the file again in the
828            transaction commit, which would destroy the recovery
829            area */
830         addition = (tdb->file->map_size - tdb->transaction->old_map_size) +
831                 sizeof(*rec) + rec->max_len;
832         tdb->file->map_size = tdb->transaction->old_map_size;
833         tdb->stats.transaction_expand_file++;
834         ecode = methods->expand_file(tdb, addition);
835         if (ecode != TDB_SUCCESS) {
836                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
837                                   "tdb_recovery_allocate:"
838                                   " failed to create recovery area");
839         }
840
841         /* we have to reset the old map size so that we don't try to
842            expand the file again in the transaction commit, which
843            would destroy the recovery area */
844         tdb->transaction->old_map_size = tdb->file->map_size;
845
846         /* write the recovery header offset and sync - we can sync without a race here
847            as the magic ptr in the recovery record has not been set */
848         recovery_off = off;
849         tdb_convert(tdb, &recovery_off, sizeof(recovery_off));
850         ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery),
851                                 &recovery_off, sizeof(tdb_off_t));
852         if (ecode != TDB_SUCCESS) {
853                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
854                                   "tdb_recovery_allocate:"
855                                   " failed to write recovery head");
856         }
857         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
858                                    &recovery_off,
859                                    sizeof(tdb_off_t));
860         return off;
861 }
862
863 /*
864   setup the recovery data that will be used on a crash during commit
865 */
866 static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb)
867 {
868         tdb_len_t recovery_size = 0;
869         tdb_off_t recovery_off = 0;
870         tdb_off_t old_map_size = tdb->transaction->old_map_size;
871         struct tdb_recovery_record *recovery;
872         const struct tdb_methods *methods = tdb->transaction->io_methods;
873         uint64_t magic;
874         enum TDB_ERROR ecode;
875
876         recovery = alloc_recovery(tdb, &recovery_size);
877         if (TDB_PTR_IS_ERR(recovery))
878                 return TDB_PTR_ERR(recovery);
879
880         ecode = tdb_recovery_area(tdb, methods, &recovery_off, recovery);
881         if (ecode) {
882                 free(recovery);
883                 return ecode;
884         }
885
886         if (recovery->max_len < recovery_size) {
887                 /* Not large enough. Free up old recovery area. */
888                 if (recovery_off) {
889                         tdb->stats.frees++;
890                         ecode = add_free_record(tdb, recovery_off,
891                                                 sizeof(*recovery)
892                                                 + recovery->max_len,
893                                                 TDB_LOCK_WAIT, true);
894                         free(recovery);
895                         if (ecode != TDB_SUCCESS) {
896                                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
897                                                   "tdb_recovery_allocate:"
898                                                   " failed to free previous"
899                                                   " recovery area");
900                         }
901
902                         /* Refresh recovery after add_free_record above. */
903                         recovery = alloc_recovery(tdb, &recovery_size);
904                         if (TDB_PTR_IS_ERR(recovery))
905                                 return TDB_PTR_ERR(recovery);
906                 }
907
908                 recovery_off = create_recovery_area(tdb, recovery_size,
909                                                     recovery);
910                 if (TDB_OFF_IS_ERR(recovery_off)) {
911                         free(recovery);
912                         return recovery_off;
913                 }
914         }
915
916         /* Now we know size, convert rec header. */
917         recovery->magic = TDB_RECOVERY_INVALID_MAGIC;
918         recovery->len = recovery_size;
919         recovery->eof = old_map_size;
920         tdb_convert(tdb, recovery, sizeof(*recovery));
921
922         /* write the recovery data to the recovery area */
923         ecode = methods->twrite(tdb, recovery_off, recovery, recovery_size);
924         if (ecode != TDB_SUCCESS) {
925                 free(recovery);
926                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
927                                   "tdb_transaction_setup_recovery:"
928                                   " failed to write recovery data");
929         }
930         transaction_write_existing(tdb, recovery_off, recovery, recovery_size);
931
932         free(recovery);
933
934         /* as we don't have ordered writes, we have to sync the recovery
935            data before we update the magic to indicate that the recovery
936            data is present */
937         ecode = transaction_sync(tdb, recovery_off, recovery_size);
938         if (ecode != TDB_SUCCESS)
939                 return ecode;
940
941         magic = TDB_RECOVERY_MAGIC;
942         tdb_convert(tdb, &magic, sizeof(magic));
943
944         tdb->transaction->magic_offset
945                 = recovery_off + offsetof(struct tdb_recovery_record, magic);
946
947         ecode = methods->twrite(tdb, tdb->transaction->magic_offset,
948                                 &magic, sizeof(magic));
949         if (ecode != TDB_SUCCESS) {
950                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
951                                   "tdb_transaction_setup_recovery:"
952                                   " failed to write recovery magic");
953         }
954         transaction_write_existing(tdb, tdb->transaction->magic_offset,
955                                    &magic, sizeof(magic));
956
957         /* ensure the recovery magic marker is on disk */
958         return transaction_sync(tdb, tdb->transaction->magic_offset,
959                                 sizeof(magic));
960 }
961
962 static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
963 {
964         const struct tdb_methods *methods;
965         enum TDB_ERROR ecode;
966
967         if (tdb->transaction == NULL) {
968                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
969                                   "tdb_transaction_prepare_commit:"
970                                   " no transaction");
971         }
972
973         if (tdb->transaction->prepared) {
974                 _tdb_transaction_cancel(tdb);
975                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
976                                   "tdb_transaction_prepare_commit:"
977                                   " transaction already prepared");
978         }
979
980         if (tdb->transaction->transaction_error) {
981                 _tdb_transaction_cancel(tdb);
982                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
983                                   "tdb_transaction_prepare_commit:"
984                                   " transaction error pending");
985         }
986
987
988         if (tdb->transaction->nesting != 0) {
989                 return TDB_SUCCESS;
990         }
991
992         /* check for a null transaction */
993         if (tdb->transaction->blocks == NULL) {
994                 return TDB_SUCCESS;
995         }
996
997         methods = tdb->transaction->io_methods;
998
999         /* upgrade the main transaction lock region to a write lock */
1000         ecode = tdb_allrecord_upgrade(tdb);
1001         if (ecode != TDB_SUCCESS) {
1002                 return ecode;
1003         }
1004
1005         /* get the open lock - this prevents new users attaching to the database
1006            during the commit */
1007         ecode = tdb_lock_open(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
1008         if (ecode != TDB_SUCCESS) {
1009                 return ecode;
1010         }
1011
1012         /* Since we have whole db locked, we don't need the expansion lock. */
1013         if (!(tdb->flags & TDB_NOSYNC)) {
1014                 /* Sets up tdb->transaction->recovery and
1015                  * tdb->transaction->magic_offset. */
1016                 ecode = transaction_setup_recovery(tdb);
1017                 if (ecode != TDB_SUCCESS) {
1018                         return ecode;
1019                 }
1020         }
1021
1022         tdb->transaction->prepared = true;
1023
1024         /* expand the file to the new size if needed */
1025         if (tdb->file->map_size != tdb->transaction->old_map_size) {
1026                 tdb_len_t add;
1027
1028                 add = tdb->file->map_size - tdb->transaction->old_map_size;
1029                 /* Restore original map size for tdb_expand_file */
1030                 tdb->file->map_size = tdb->transaction->old_map_size;
1031                 ecode = methods->expand_file(tdb, add);
1032                 if (ecode != TDB_SUCCESS) {
1033                         return ecode;
1034                 }
1035         }
1036
1037         /* Keep the open lock until the actual commit */
1038         return TDB_SUCCESS;
1039 }
1040
1041 /*
1042    prepare to commit the current transaction
1043 */
1044 enum TDB_ERROR tdb_transaction_prepare_commit(struct tdb_context *tdb)
1045 {
1046         return _tdb_transaction_prepare_commit(tdb);
1047 }
1048
1049 /*
1050   commit the current transaction
1051 */
1052 enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb)
1053 {
1054         const struct tdb_methods *methods;
1055         int i;
1056         enum TDB_ERROR ecode;
1057
1058         if (tdb->transaction == NULL) {
1059                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
1060                                                     TDB_LOG_USE_ERROR,
1061                                                     "tdb_transaction_commit:"
1062                                                     " no transaction");
1063         }
1064
1065         tdb_trace(tdb, "tdb_transaction_commit");
1066
1067         if (tdb->transaction->nesting != 0) {
1068                 tdb->transaction->nesting--;
1069                 return tdb->last_error = TDB_SUCCESS;
1070         }
1071
1072         /* check for a null transaction */
1073         if (tdb->transaction->blocks == NULL) {
1074                 _tdb_transaction_cancel(tdb);
1075                 return tdb->last_error = TDB_SUCCESS;
1076         }
1077
1078         if (!tdb->transaction->prepared) {
1079                 ecode = _tdb_transaction_prepare_commit(tdb);
1080                 if (ecode != TDB_SUCCESS) {
1081                         _tdb_transaction_cancel(tdb);
1082                         return tdb->last_error = ecode;
1083                 }
1084         }
1085
1086         methods = tdb->transaction->io_methods;
1087
1088         /* perform all the writes */
1089         for (i=0;i<tdb->transaction->num_blocks;i++) {
1090                 tdb_off_t offset;
1091                 tdb_len_t length;
1092
1093                 if (tdb->transaction->blocks[i] == NULL) {
1094                         continue;
1095                 }
1096
1097                 offset = i * PAGESIZE;
1098                 length = PAGESIZE;
1099                 if (i == tdb->transaction->num_blocks-1) {
1100                         length = tdb->transaction->last_block_size;
1101                 }
1102
1103                 ecode = methods->twrite(tdb, offset,
1104                                         tdb->transaction->blocks[i], length);
1105                 if (ecode != TDB_SUCCESS) {
1106                         /* we've overwritten part of the data and
1107                            possibly expanded the file, so we need to
1108                            run the crash recovery code */
1109                         tdb->methods = methods;
1110                         tdb_transaction_recover(tdb);
1111
1112                         _tdb_transaction_cancel(tdb);
1113
1114                         return tdb->last_error = ecode;
1115                 }
1116                 SAFE_FREE(tdb->transaction->blocks[i]);
1117         }
1118
1119         SAFE_FREE(tdb->transaction->blocks);
1120         tdb->transaction->num_blocks = 0;
1121
1122         /* ensure the new data is on disk */
1123         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1124         if (ecode != TDB_SUCCESS) {
1125                 return tdb->last_error = ecode;
1126         }
1127
1128         /*
1129           TODO: maybe write to some dummy hdr field, or write to magic
1130           offset without mmap, before the last sync, instead of the
1131           utime() call
1132         */
1133
1134         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1135            don't change the mtime of the file, this means the file may
1136            not be backed up (as tdb rounding to block sizes means that
1137            file size changes are quite rare too). The following forces
1138            mtime changes when a transaction completes */
1139 #if HAVE_UTIME
1140         utime(tdb->name, NULL);
1141 #endif
1142
1143         /* use a transaction cancel to free memory and remove the
1144            transaction locks: it "restores" map_size, too. */
1145         tdb->transaction->old_map_size = tdb->file->map_size;
1146         _tdb_transaction_cancel(tdb);
1147
1148         return tdb->last_error = TDB_SUCCESS;
1149 }
1150
1151
1152 /*
1153   recover from an aborted transaction. Must be called with exclusive
1154   database write access already established (including the open
1155   lock to prevent new processes attaching)
1156 */
1157 enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb)
1158 {
1159         tdb_off_t recovery_head, recovery_eof;
1160         unsigned char *data, *p;
1161         struct tdb_recovery_record rec;
1162         enum TDB_ERROR ecode;
1163
1164         /* find the recovery area */
1165         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1166         if (TDB_OFF_IS_ERR(recovery_head)) {
1167                 return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
1168                                   "tdb_transaction_recover:"
1169                                   " failed to read recovery head");
1170         }
1171
1172         if (recovery_head == 0) {
1173                 /* we have never allocated a recovery record */
1174                 return TDB_SUCCESS;
1175         }
1176
1177         /* read the recovery record */
1178         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1179         if (ecode != TDB_SUCCESS) {
1180                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1181                                   "tdb_transaction_recover:"
1182                                   " failed to read recovery record");
1183         }
1184
1185         if (rec.magic != TDB_RECOVERY_MAGIC) {
1186                 /* there is no valid recovery data */
1187                 return TDB_SUCCESS;
1188         }
1189
1190         if (tdb->read_only) {
1191                 return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1192                                   "tdb_transaction_recover:"
1193                                   " attempt to recover read only database");
1194         }
1195
1196         recovery_eof = rec.eof;
1197
1198         data = (unsigned char *)malloc(rec.len);
1199         if (data == NULL) {
1200                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1201                                   "tdb_transaction_recover:"
1202                                   " failed to allocate recovery data");
1203         }
1204
1205         /* read the full recovery data */
1206         ecode = tdb->methods->tread(tdb, recovery_head + sizeof(rec), data,
1207                                     rec.len);
1208         if (ecode != TDB_SUCCESS) {
1209                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1210                                   "tdb_transaction_recover:"
1211                                   " failed to read recovery data");
1212         }
1213
1214         /* recover the file data */
1215         p = data;
1216         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1217                 tdb_off_t ofs;
1218                 tdb_len_t len;
1219                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1220                 memcpy(&ofs, p, sizeof(ofs));
1221                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1222                 p += sizeof(ofs) + sizeof(len);
1223
1224                 ecode = tdb->methods->twrite(tdb, ofs, p, len);
1225                 if (ecode != TDB_SUCCESS) {
1226                         free(data);
1227                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1228                                           "tdb_transaction_recover:"
1229                                           " failed to recover %zu bytes"
1230                                           " at offset %zu",
1231                                           (size_t)len, (size_t)ofs);
1232                 }
1233                 p += len;
1234         }
1235
1236         free(data);
1237
1238         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1239         if (ecode != TDB_SUCCESS) {
1240                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1241                                   "tdb_transaction_recover:"
1242                                   " failed to sync recovery");
1243         }
1244
1245         /* if the recovery area is after the recovered eof then remove it */
1246         if (recovery_eof <= recovery_head) {
1247                 ecode = tdb_write_off(tdb, offsetof(struct tdb_header,
1248                                                     recovery),
1249                                       0);
1250                 if (ecode != TDB_SUCCESS) {
1251                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1252                                           "tdb_transaction_recover:"
1253                                           " failed to remove recovery head");
1254                 }
1255         }
1256
1257         /* remove the recovery magic */
1258         ecode = tdb_write_off(tdb,
1259                               recovery_head
1260                               + offsetof(struct tdb_recovery_record, magic),
1261                               TDB_RECOVERY_INVALID_MAGIC);
1262         if (ecode != TDB_SUCCESS) {
1263                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1264                                   "tdb_transaction_recover:"
1265                                   " failed to remove recovery magic");
1266         }
1267
1268         ecode = transaction_sync(tdb, 0, recovery_eof);
1269         if (ecode != TDB_SUCCESS) {
1270                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1271                                   "tdb_transaction_recover:"
1272                                   " failed to sync2 recovery");
1273         }
1274
1275         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1276                    "tdb_transaction_recover: recovered %zu byte database",
1277                    (size_t)recovery_eof);
1278
1279         /* all done */
1280         return TDB_SUCCESS;
1281 }
1282
1283 tdb_bool_err tdb_needs_recovery(struct tdb_context *tdb)
1284 {
1285         tdb_off_t recovery_head;
1286         struct tdb_recovery_record rec;
1287         enum TDB_ERROR ecode;
1288
1289         /* find the recovery area */
1290         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1291         if (TDB_OFF_IS_ERR(recovery_head)) {
1292                 return recovery_head;
1293         }
1294
1295         if (recovery_head == 0) {
1296                 /* we have never allocated a recovery record */
1297                 return false;
1298         }
1299
1300         /* read the recovery record */
1301         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1302         if (ecode != TDB_SUCCESS) {
1303                 return ecode;
1304         }
1305
1306         return (rec.magic == TDB_RECOVERY_MAGIC);
1307 }