]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/transaction.c
read_write_all: avoid arithmetic on void pointers.
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in POSIX locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is canceled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or canceled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off,
129                                        void *buf, tdb_len_t len)
130 {
131         size_t blk;
132         enum TDB_ERROR ecode;
133
134         /* break it down into block sized ops */
135         while (len + (off % getpagesize()) > getpagesize()) {
136                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
137                 ecode = transaction_read(tdb, off, buf, len2);
138                 if (ecode != TDB_SUCCESS) {
139                         return ecode;
140                 }
141                 len -= len2;
142                 off += len2;
143                 buf = (void *)(len2 + (char *)buf);
144         }
145
146         if (len == 0) {
147                 return TDB_SUCCESS;
148         }
149
150         blk = off / getpagesize();
151
152         /* see if we have it in the block list */
153         if (tdb->transaction->num_blocks <= blk ||
154             tdb->transaction->blocks[blk] == NULL) {
155                 /* nope, do a real read */
156                 ecode = tdb->transaction->io_methods->tread(tdb, off, buf, len);
157                 if (ecode != TDB_SUCCESS) {
158                         goto fail;
159                 }
160                 return 0;
161         }
162
163         /* it is in the block list. Now check for the last block */
164         if (blk == tdb->transaction->num_blocks-1) {
165                 if (len > tdb->transaction->last_block_size) {
166                         ecode = TDB_ERR_IO;
167                         goto fail;
168                 }
169         }
170
171         /* now copy it out of this block */
172         memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len);
173         return TDB_SUCCESS;
174
175 fail:
176         tdb->transaction->transaction_error = 1;
177         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
178                           "transaction_read: failed at off=%zu len=%zu",
179                           (size_t)off, (size_t)len);
180 }
181
182
183 /*
184   write while in a transaction
185 */
186 static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off,
187                                         const void *buf, tdb_len_t len)
188 {
189         size_t blk;
190         enum TDB_ERROR ecode;
191
192         /* Only a commit is allowed on a prepared transaction */
193         if (tdb->transaction->prepared) {
194                 ecode = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
195                                    "transaction_write: transaction already"
196                                    " prepared, write not allowed");
197                 goto fail;
198         }
199
200         /* break it up into block sized chunks */
201         while (len + (off % getpagesize()) > getpagesize()) {
202                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
203                 ecode = transaction_write(tdb, off, buf, len2);
204                 if (ecode != TDB_SUCCESS) {
205                         return -1;
206                 }
207                 len -= len2;
208                 off += len2;
209                 if (buf != NULL) {
210                         buf = (const void *)(len2 + (const char *)buf);
211                 }
212         }
213
214         if (len == 0) {
215                 return TDB_SUCCESS;
216         }
217
218         blk = off / getpagesize();
219         off = off % getpagesize();
220
221         if (tdb->transaction->num_blocks <= blk) {
222                 uint8_t **new_blocks;
223                 /* expand the blocks array */
224                 if (tdb->transaction->blocks == NULL) {
225                         new_blocks = (uint8_t **)malloc(
226                                 (blk+1)*sizeof(uint8_t *));
227                 } else {
228                         new_blocks = (uint8_t **)realloc(
229                                 tdb->transaction->blocks,
230                                 (blk+1)*sizeof(uint8_t *));
231                 }
232                 if (new_blocks == NULL) {
233                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
234                                            "transaction_write:"
235                                            " failed to allocate");
236                         goto fail;
237                 }
238                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
239                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
240                 tdb->transaction->blocks = new_blocks;
241                 tdb->transaction->num_blocks = blk+1;
242                 tdb->transaction->last_block_size = 0;
243         }
244
245         /* allocate and fill a block? */
246         if (tdb->transaction->blocks[blk] == NULL) {
247                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1);
248                 if (tdb->transaction->blocks[blk] == NULL) {
249                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
250                                            "transaction_write:"
251                                            " failed to allocate");
252                         goto fail;
253                 }
254                 if (tdb->transaction->old_map_size > blk * getpagesize()) {
255                         tdb_len_t len2 = getpagesize();
256                         if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) {
257                                 len2 = tdb->transaction->old_map_size - (blk * getpagesize());
258                         }
259                         ecode = tdb->transaction->io_methods->tread(tdb,
260                                         blk * getpagesize(),
261                                         tdb->transaction->blocks[blk],
262                                         len2);
263                         if (ecode != TDB_SUCCESS) {
264                                 ecode = tdb_logerr(tdb, ecode,
265                                                    TDB_LOG_ERROR,
266                                                    "transaction_write:"
267                                                    " failed to"
268                                                    " read old block: %s",
269                                                    strerror(errno));
270                                 SAFE_FREE(tdb->transaction->blocks[blk]);
271                                 goto fail;
272                         }
273                         if (blk == tdb->transaction->num_blocks-1) {
274                                 tdb->transaction->last_block_size = len2;
275                         }
276                 }
277         }
278
279         /* overwrite part of an existing block */
280         if (buf == NULL) {
281                 memset(tdb->transaction->blocks[blk] + off, 0, len);
282         } else {
283                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
284         }
285         if (blk == tdb->transaction->num_blocks-1) {
286                 if (len + off > tdb->transaction->last_block_size) {
287                         tdb->transaction->last_block_size = len + off;
288                 }
289         }
290
291         return TDB_SUCCESS;
292
293 fail:
294         tdb->transaction->transaction_error = 1;
295         return ecode;
296 }
297
298
299 /*
300   write while in a transaction - this variant never expands the transaction blocks, it only
301   updates existing blocks. This means it cannot change the recovery size
302 */
303 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
304                                        const void *buf, tdb_len_t len)
305 {
306         size_t blk;
307
308         /* break it up into block sized chunks */
309         while (len + (off % getpagesize()) > getpagesize()) {
310                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
311                 transaction_write_existing(tdb, off, buf, len2);
312                 len -= len2;
313                 off += len2;
314                 if (buf != NULL) {
315                         buf = (const void *)(len2 + (const char *)buf);
316                 }
317         }
318
319         if (len == 0) {
320                 return;
321         }
322
323         blk = off / getpagesize();
324         off = off % getpagesize();
325
326         if (tdb->transaction->num_blocks <= blk ||
327             tdb->transaction->blocks[blk] == NULL) {
328                 return;
329         }
330
331         if (blk == tdb->transaction->num_blocks-1 &&
332             off + len > tdb->transaction->last_block_size) {
333                 if (off >= tdb->transaction->last_block_size) {
334                         return;
335                 }
336                 len = tdb->transaction->last_block_size - off;
337         }
338
339         /* overwrite part of an existing block */
340         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
341 }
342
343
344 /*
345   out of bounds check during a transaction
346 */
347 static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len,
348                                       bool probe)
349 {
350         if (len <= tdb->file->map_size) {
351                 return TDB_SUCCESS;
352         }
353         if (!probe) {
354                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
355                            "tdb_oob len %lld beyond transaction size %lld",
356                            (long long)len,
357                            (long long)tdb->file->map_size);
358         }
359         return TDB_ERR_IO;
360 }
361
362 /*
363   transaction version of tdb_expand().
364 */
365 static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb,
366                                               tdb_off_t addition)
367 {
368         enum TDB_ERROR ecode;
369
370         /* add a write to the transaction elements, so subsequent
371            reads see the zero data */
372         ecode = transaction_write(tdb, tdb->file->map_size, NULL, addition);
373         if (ecode == TDB_SUCCESS) {
374                 tdb->file->map_size += addition;
375         }
376         return ecode;
377 }
378
379 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
380                                 size_t len, bool write_mode)
381 {
382         size_t blk = off / getpagesize(), end_blk;
383
384         /* This is wrong for zero-length blocks, but will fail gracefully */
385         end_blk = (off + len - 1) / getpagesize();
386
387         /* Can only do direct if in single block and we've already copied. */
388         if (write_mode) {
389                 if (blk != end_blk)
390                         return NULL;
391                 if (blk >= tdb->transaction->num_blocks)
392                         return NULL;
393                 if (tdb->transaction->blocks[blk] == NULL)
394                         return NULL;
395                 return tdb->transaction->blocks[blk] + off % getpagesize();
396         }
397
398         /* Single which we have copied? */
399         if (blk == end_blk
400             && blk < tdb->transaction->num_blocks
401             && tdb->transaction->blocks[blk])
402                 return tdb->transaction->blocks[blk] + off % getpagesize();
403
404         /* Otherwise must be all not copied. */
405         while (blk < end_blk) {
406                 if (blk >= tdb->transaction->num_blocks)
407                         break;
408                 if (tdb->transaction->blocks[blk])
409                         return NULL;
410                 blk++;
411         }
412         return tdb->transaction->io_methods->direct(tdb, off, len, false);
413 }
414
415 static const struct tdb_methods transaction_methods = {
416         transaction_read,
417         transaction_write,
418         transaction_oob,
419         transaction_expand_file,
420         transaction_direct,
421 };
422
423 /*
424   sync to disk
425 */
426 static enum TDB_ERROR transaction_sync(struct tdb_context *tdb,
427                                        tdb_off_t offset, tdb_len_t length)
428 {
429         if (tdb->flags & TDB_NOSYNC) {
430                 return TDB_SUCCESS;
431         }
432
433         if (fsync(tdb->file->fd) != 0) {
434                 return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
435                                   "tdb_transaction: fsync failed: %s",
436                                   strerror(errno));
437         }
438 #ifdef MS_SYNC
439         if (tdb->file->map_ptr) {
440                 tdb_off_t moffset = offset & ~(getpagesize()-1);
441                 if (msync(moffset + (char *)tdb->file->map_ptr,
442                           length + (offset - moffset), MS_SYNC) != 0) {
443                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
444                                           "tdb_transaction: msync failed: %s",
445                                           strerror(errno));
446                 }
447         }
448 #endif
449         return TDB_SUCCESS;
450 }
451
452
453 static void _tdb_transaction_cancel(struct tdb_context *tdb)
454 {
455         int i;
456         enum TDB_ERROR ecode;
457
458         if (tdb->transaction == NULL) {
459                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
460                            "tdb_transaction_cancel: no transaction");
461                 return;
462         }
463
464         if (tdb->transaction->nesting != 0) {
465                 tdb->transaction->transaction_error = 1;
466                 tdb->transaction->nesting--;
467                 return;
468         }
469
470         tdb->file->map_size = tdb->transaction->old_map_size;
471
472         /* free all the transaction blocks */
473         for (i=0;i<tdb->transaction->num_blocks;i++) {
474                 if (tdb->transaction->blocks[i] != NULL) {
475                         free(tdb->transaction->blocks[i]);
476                 }
477         }
478         SAFE_FREE(tdb->transaction->blocks);
479
480         if (tdb->transaction->magic_offset) {
481                 const struct tdb_methods *methods = tdb->transaction->io_methods;
482                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
483
484                 /* remove the recovery marker */
485                 ecode = methods->twrite(tdb, tdb->transaction->magic_offset,
486                                         &invalid, sizeof(invalid));
487                 if (ecode == TDB_SUCCESS)
488                         ecode = transaction_sync(tdb,
489                                                  tdb->transaction->magic_offset,
490                                                  sizeof(invalid));
491                 if (ecode != TDB_SUCCESS) {
492                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
493                                    "tdb_transaction_cancel: failed to remove"
494                                    " recovery magic");
495                 }
496         }
497
498         if (tdb->file->allrecord_lock.count)
499                 tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
500
501         /* restore the normal io methods */
502         tdb->methods = tdb->transaction->io_methods;
503
504         tdb_transaction_unlock(tdb, F_WRLCK);
505
506         if (tdb_has_open_lock(tdb))
507                 tdb_unlock_open(tdb);
508
509         SAFE_FREE(tdb->transaction);
510 }
511
512 /*
513   start a tdb transaction. No token is returned, as only a single
514   transaction is allowed to be pending per tdb_context
515 */
516 enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb)
517 {
518         enum TDB_ERROR ecode;
519
520         /* some sanity checks */
521         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
522                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
523                                                     TDB_LOG_USE_ERROR,
524                                                     "tdb_transaction_start:"
525                                                     " cannot start a"
526                                                     " transaction on a "
527                                                     "read-only or internal db");
528         }
529
530         /* cope with nested tdb_transaction_start() calls */
531         if (tdb->transaction != NULL) {
532                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO,
533                                                     TDB_LOG_USE_ERROR,
534                                                     "tdb_transaction_start:"
535                                                     " already inside"
536                                                     " transaction");
537         }
538
539         if (tdb_has_hash_locks(tdb)) {
540                 /* the caller must not have any locks when starting a
541                    transaction as otherwise we'll be screwed by lack
542                    of nested locks in POSIX */
543                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK,
544                                                     TDB_LOG_USE_ERROR,
545                                                     "tdb_transaction_start:"
546                                                     " cannot start a"
547                                                     " transaction with locks"
548                                                     " held");
549         }
550
551         tdb->transaction = (struct tdb_transaction *)
552                 calloc(sizeof(struct tdb_transaction), 1);
553         if (tdb->transaction == NULL) {
554                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM,
555                                                     TDB_LOG_ERROR,
556                                                     "tdb_transaction_start:"
557                                                     " cannot allocate");
558         }
559
560         /* get the transaction write lock. This is a blocking lock. As
561            discussed with Volker, there are a number of ways we could
562            make this async, which we will probably do in the future */
563         ecode = tdb_transaction_lock(tdb, F_WRLCK);
564         if (ecode != TDB_SUCCESS) {
565                 SAFE_FREE(tdb->transaction->blocks);
566                 SAFE_FREE(tdb->transaction);
567                 return tdb->last_error = ecode;
568         }
569
570         /* get a read lock over entire file. This is upgraded to a write
571            lock during the commit */
572         ecode = tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true);
573         if (ecode != TDB_SUCCESS) {
574                 goto fail_allrecord_lock;
575         }
576
577         /* make sure we know about any file expansions already done by
578            anyone else */
579         tdb->methods->oob(tdb, tdb->file->map_size + 1, true);
580         tdb->transaction->old_map_size = tdb->file->map_size;
581
582         /* finally hook the io methods, replacing them with
583            transaction specific methods */
584         tdb->transaction->io_methods = tdb->methods;
585         tdb->methods = &transaction_methods;
586         return tdb->last_error = TDB_SUCCESS;
587
588 fail_allrecord_lock:
589         tdb_transaction_unlock(tdb, F_WRLCK);
590         SAFE_FREE(tdb->transaction->blocks);
591         SAFE_FREE(tdb->transaction);
592         return tdb->last_error = ecode;
593 }
594
595
596 /*
597   cancel the current transaction
598 */
599 void tdb_transaction_cancel(struct tdb_context *tdb)
600 {
601         _tdb_transaction_cancel(tdb);
602 }
603
604 /*
605   work out how much space the linearised recovery data will consume
606 */
607 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
608 {
609         tdb_len_t recovery_size = 0;
610         int i;
611
612         recovery_size = sizeof(tdb_len_t);
613         for (i=0;i<tdb->transaction->num_blocks;i++) {
614                 if (i * getpagesize() >= tdb->transaction->old_map_size) {
615                         break;
616                 }
617                 if (tdb->transaction->blocks[i] == NULL) {
618                         continue;
619                 }
620                 recovery_size += 2*sizeof(tdb_off_t);
621                 if (i == tdb->transaction->num_blocks-1) {
622                         recovery_size += tdb->transaction->last_block_size;
623                 } else {
624                         recovery_size += getpagesize();
625                 }
626         }
627
628         return recovery_size;
629 }
630
631 /*
632   allocate the recovery area, or use an existing recovery area if it is
633   large enough
634 */
635 static enum TDB_ERROR tdb_recovery_allocate(struct tdb_context *tdb,
636                                             tdb_len_t *recovery_size,
637                                             tdb_off_t *recovery_offset,
638                                             tdb_len_t *recovery_max_size)
639 {
640         struct tdb_recovery_record rec;
641         const struct tdb_methods *methods = tdb->transaction->io_methods;
642         tdb_off_t recovery_head;
643         size_t addition;
644         enum TDB_ERROR ecode;
645
646         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
647         if (TDB_OFF_IS_ERR(recovery_head)) {
648                 return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
649                                   "tdb_recovery_allocate:"
650                                   " failed to read recovery head");
651         }
652
653         if (recovery_head != 0) {
654                 ecode = methods->tread(tdb, recovery_head, &rec, sizeof(rec));
655                 if (ecode != TDB_SUCCESS) {
656                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
657                                           "tdb_recovery_allocate:"
658                                           " failed to read recovery record");
659                 }
660                 tdb_convert(tdb, &rec, sizeof(rec));
661                 /* ignore invalid recovery regions: can happen in crash */
662                 if (rec.magic != TDB_RECOVERY_MAGIC &&
663                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
664                         recovery_head = 0;
665                 }
666         }
667
668         *recovery_size = tdb_recovery_size(tdb);
669
670         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
671                 /* it fits in the existing area */
672                 *recovery_max_size = rec.max_len;
673                 *recovery_offset = recovery_head;
674                 return TDB_SUCCESS;
675         }
676
677         /* we need to free up the old recovery area, then allocate a
678            new one at the end of the file. Note that we cannot use
679            normal allocation to allocate the new one as that might return
680            us an area that is being currently used (as of the start of
681            the transaction) */
682         if (recovery_head != 0) {
683                 add_stat(tdb, frees, 1);
684                 ecode = add_free_record(tdb, recovery_head,
685                                         sizeof(rec) + rec.max_len);
686                 if (ecode != TDB_SUCCESS) {
687                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
688                                           "tdb_recovery_allocate:"
689                                           " failed to free previous"
690                                           " recovery area");
691                 }
692         }
693
694         /* the tdb_free() call might have increased the recovery size */
695         *recovery_size = tdb_recovery_size(tdb);
696
697         /* round up to a multiple of page size */
698         *recovery_max_size
699                 = (((sizeof(rec) + *recovery_size) + getpagesize()-1)
700                    & ~(getpagesize()-1))
701                 - sizeof(rec);
702         *recovery_offset = tdb->file->map_size;
703         recovery_head = *recovery_offset;
704
705         /* Restore ->map_size before calling underlying expand_file.
706            Also so that we don't try to expand the file again in the
707            transaction commit, which would destroy the recovery
708            area */
709         addition = (tdb->file->map_size - tdb->transaction->old_map_size) +
710                 sizeof(rec) + *recovery_max_size;
711         tdb->file->map_size = tdb->transaction->old_map_size;
712         ecode = methods->expand_file(tdb, addition);
713         if (ecode != TDB_SUCCESS) {
714                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
715                                   "tdb_recovery_allocate:"
716                                   " failed to create recovery area");
717         }
718
719         /* we have to reset the old map size so that we don't try to
720            expand the file again in the transaction commit, which
721            would destroy the recovery area */
722         tdb->transaction->old_map_size = tdb->file->map_size;
723
724         /* write the recovery header offset and sync - we can sync without a race here
725            as the magic ptr in the recovery record has not been set */
726         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
727         ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery),
728                                 &recovery_head, sizeof(tdb_off_t));
729         if (ecode != TDB_SUCCESS) {
730                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
731                                   "tdb_recovery_allocate:"
732                                   " failed to write recovery head");
733         }
734         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
735                                    &recovery_head,
736                                    sizeof(tdb_off_t));
737         return TDB_SUCCESS;
738 }
739
740 /* Set up header for the recovery record. */
741 static void set_recovery_header(struct tdb_recovery_record *rec,
742                                 uint64_t magic,
743                                 uint64_t datalen, uint64_t actuallen,
744                                 uint64_t oldsize)
745 {
746         rec->magic = magic;
747         rec->max_len = actuallen;
748         rec->len = datalen;
749         rec->eof = oldsize;
750 }
751
752 /*
753   setup the recovery data that will be used on a crash during commit
754 */
755 static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb,
756                                                  tdb_off_t *magic_offset)
757 {
758         /* Initialized for GCC's 4.4.5 overzealous uninitialized warnings. */
759         tdb_len_t recovery_size = 0;
760         tdb_off_t recovery_offset = 0, recovery_max_size = 0;
761         unsigned char *data, *p;
762         const struct tdb_methods *methods = tdb->transaction->io_methods;
763         struct tdb_recovery_record *rec;
764         tdb_off_t old_map_size = tdb->transaction->old_map_size;
765         uint64_t magic, tailer;
766         int i;
767         enum TDB_ERROR ecode;
768
769         /*
770           check that the recovery area has enough space
771         */
772         ecode = tdb_recovery_allocate(tdb, &recovery_size,
773                                       &recovery_offset, &recovery_max_size);
774         if (ecode != TDB_SUCCESS) {
775                 return ecode;
776         }
777
778         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
779         if (data == NULL) {
780                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
781                                   "transaction_setup_recovery:"
782                                   " cannot allocate");
783         }
784
785         rec = (struct tdb_recovery_record *)data;
786         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
787                             recovery_size, recovery_max_size, old_map_size);
788         tdb_convert(tdb, rec, sizeof(*rec));
789
790         /* build the recovery data into a single blob to allow us to do a single
791            large write, which should be more efficient */
792         p = data + sizeof(*rec);
793         for (i=0;i<tdb->transaction->num_blocks;i++) {
794                 tdb_off_t offset;
795                 tdb_len_t length;
796
797                 if (tdb->transaction->blocks[i] == NULL) {
798                         continue;
799                 }
800
801                 offset = i * getpagesize();
802                 length = getpagesize();
803                 if (i == tdb->transaction->num_blocks-1) {
804                         length = tdb->transaction->last_block_size;
805                 }
806
807                 if (offset >= old_map_size) {
808                         continue;
809                 }
810                 if (offset + length > tdb->file->map_size) {
811                         free(data);
812                         return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
813                                           "tdb_transaction_setup_recovery:"
814                                           " transaction data over new region"
815                                           " boundary");
816                 }
817                 memcpy(p, &offset, sizeof(offset));
818                 memcpy(p + sizeof(offset), &length, sizeof(length));
819                 tdb_convert(tdb, p, sizeof(offset) + sizeof(length));
820
821                 /* the recovery area contains the old data, not the
822                    new data, so we have to call the original tdb_read
823                    method to get it */
824                 ecode = methods->tread(tdb, offset,
825                                        p + sizeof(offset) + sizeof(length),
826                                        length);
827                 if (ecode != TDB_SUCCESS) {
828                         free(data);
829                         return ecode;
830                 }
831                 p += sizeof(offset) + sizeof(length) + length;
832         }
833
834         /* and the tailer */
835         tailer = sizeof(*rec) + recovery_max_size;
836         memcpy(p, &tailer, sizeof(tailer));
837         tdb_convert(tdb, p, sizeof(tailer));
838
839         /* write the recovery data to the recovery area */
840         ecode = methods->twrite(tdb, recovery_offset, data,
841                                 sizeof(*rec) + recovery_size);
842         if (ecode != TDB_SUCCESS) {
843                 free(data);
844                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
845                                   "tdb_transaction_setup_recovery:"
846                                   " failed to write recovery data");
847         }
848         transaction_write_existing(tdb, recovery_offset, data,
849                                    sizeof(*rec) + recovery_size);
850
851         /* as we don't have ordered writes, we have to sync the recovery
852            data before we update the magic to indicate that the recovery
853            data is present */
854         ecode = transaction_sync(tdb, recovery_offset,
855                                  sizeof(*rec) + recovery_size);
856         if (ecode != TDB_SUCCESS) {
857                 free(data);
858                 return ecode;
859         }
860
861         free(data);
862
863         magic = TDB_RECOVERY_MAGIC;
864         tdb_convert(tdb, &magic, sizeof(magic));
865
866         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
867                                                    magic);
868
869         ecode = methods->twrite(tdb, *magic_offset, &magic, sizeof(magic));
870         if (ecode != TDB_SUCCESS) {
871                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
872                                   "tdb_transaction_setup_recovery:"
873                                   " failed to write recovery magic");
874         }
875         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
876
877         /* ensure the recovery magic marker is on disk */
878         return transaction_sync(tdb, *magic_offset, sizeof(magic));
879 }
880
881 static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
882 {
883         const struct tdb_methods *methods;
884         enum TDB_ERROR ecode;
885
886         if (tdb->transaction == NULL) {
887                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
888                                   "tdb_transaction_prepare_commit:"
889                                   " no transaction");
890         }
891
892         if (tdb->transaction->prepared) {
893                 _tdb_transaction_cancel(tdb);
894                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
895                                   "tdb_transaction_prepare_commit:"
896                                   " transaction already prepared");
897         }
898
899         if (tdb->transaction->transaction_error) {
900                 _tdb_transaction_cancel(tdb);
901                 return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_ERROR,
902                                   "tdb_transaction_prepare_commit:"
903                                   " transaction error pending");
904         }
905
906
907         if (tdb->transaction->nesting != 0) {
908                 tdb->transaction->nesting--;
909                 return TDB_SUCCESS;
910         }
911
912         /* check for a null transaction */
913         if (tdb->transaction->blocks == NULL) {
914                 return TDB_SUCCESS;
915         }
916
917         methods = tdb->transaction->io_methods;
918
919         /* upgrade the main transaction lock region to a write lock */
920         ecode = tdb_allrecord_upgrade(tdb);
921         if (ecode != TDB_SUCCESS) {
922                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
923                          "tdb_transaction_prepare_commit:"
924                          " failed to upgrade hash locks");
925                 _tdb_transaction_cancel(tdb);
926                 return ecode;
927         }
928
929         /* get the open lock - this prevents new users attaching to the database
930            during the commit */
931         ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
932         if (ecode != TDB_SUCCESS) {
933                 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
934                            "tdb_transaction_prepare_commit:"
935                            " failed to get open lock");
936                 _tdb_transaction_cancel(tdb);
937                 return ecode;
938         }
939
940         /* Since we have whole db locked, we don't need the expansion lock. */
941         if (!(tdb->flags & TDB_NOSYNC)) {
942                 /* write the recovery data to the end of the file */
943                 ecode = transaction_setup_recovery(tdb,
944                                                    &tdb->transaction
945                                                    ->magic_offset);
946                 if (ecode != TDB_SUCCESS) {
947                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
948                                  "tdb_transaction_prepare_commit:"
949                                  " failed to setup recovery data");
950                         _tdb_transaction_cancel(tdb);
951                         return ecode;
952                 }
953         }
954
955         tdb->transaction->prepared = true;
956
957         /* expand the file to the new size if needed */
958         if (tdb->file->map_size != tdb->transaction->old_map_size) {
959                 tdb_len_t add;
960
961                 add = tdb->file->map_size - tdb->transaction->old_map_size;
962                 /* Restore original map size for tdb_expand_file */
963                 tdb->file->map_size = tdb->transaction->old_map_size;
964                 ecode = methods->expand_file(tdb, add);
965                 if (ecode != TDB_SUCCESS) {
966                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
967                                  "tdb_transaction_prepare_commit:"
968                                  " expansion failed");
969                         _tdb_transaction_cancel(tdb);
970                         return ecode;
971                 }
972         }
973
974         /* Keep the open lock until the actual commit */
975         return TDB_SUCCESS;
976 }
977
978 /*
979    prepare to commit the current transaction
980 */
981 enum TDB_ERROR tdb_transaction_prepare_commit(struct tdb_context *tdb)
982 {
983         return _tdb_transaction_prepare_commit(tdb);
984 }
985
986 /*
987   commit the current transaction
988 */
989 enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb)
990 {
991         const struct tdb_methods *methods;
992         int i;
993         enum TDB_ERROR ecode;
994
995         if (tdb->transaction == NULL) {
996                 return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
997                                                     TDB_LOG_USE_ERROR,
998                                                     "tdb_transaction_commit:"
999                                                     " no transaction");
1000         }
1001
1002         tdb_trace(tdb, "tdb_transaction_commit");
1003
1004         if (tdb->transaction->nesting != 0) {
1005                 tdb->transaction->nesting--;
1006                 return tdb->last_error = TDB_SUCCESS;
1007         }
1008
1009         /* check for a null transaction */
1010         if (tdb->transaction->blocks == NULL) {
1011                 _tdb_transaction_cancel(tdb);
1012                 return tdb->last_error = TDB_SUCCESS;
1013         }
1014
1015         if (!tdb->transaction->prepared) {
1016                 ecode = _tdb_transaction_prepare_commit(tdb);
1017                 if (ecode != TDB_SUCCESS)
1018                         return tdb->last_error = ecode;
1019         }
1020
1021         methods = tdb->transaction->io_methods;
1022
1023         /* perform all the writes */
1024         for (i=0;i<tdb->transaction->num_blocks;i++) {
1025                 tdb_off_t offset;
1026                 tdb_len_t length;
1027
1028                 if (tdb->transaction->blocks[i] == NULL) {
1029                         continue;
1030                 }
1031
1032                 offset = i * getpagesize();
1033                 length = getpagesize();
1034                 if (i == tdb->transaction->num_blocks-1) {
1035                         length = tdb->transaction->last_block_size;
1036                 }
1037
1038                 ecode = methods->twrite(tdb, offset,
1039                                         tdb->transaction->blocks[i], length);
1040                 if (ecode != TDB_SUCCESS) {
1041                         tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1042                                    "tdb_transaction_commit:"
1043                                    " write failed during commit");
1044
1045                         /* we've overwritten part of the data and
1046                            possibly expanded the file, so we need to
1047                            run the crash recovery code */
1048                         tdb->methods = methods;
1049                         tdb_transaction_recover(tdb);
1050
1051                         _tdb_transaction_cancel(tdb);
1052
1053                         return tdb->last_error = ecode;
1054                 }
1055                 SAFE_FREE(tdb->transaction->blocks[i]);
1056         }
1057
1058         SAFE_FREE(tdb->transaction->blocks);
1059         tdb->transaction->num_blocks = 0;
1060
1061         /* ensure the new data is on disk */
1062         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1063         if (ecode != TDB_SUCCESS) {
1064                 return tdb->last_error = ecode;
1065         }
1066
1067         /*
1068           TODO: maybe write to some dummy hdr field, or write to magic
1069           offset without mmap, before the last sync, instead of the
1070           utime() call
1071         */
1072
1073         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1074            don't change the mtime of the file, this means the file may
1075            not be backed up (as tdb rounding to block sizes means that
1076            file size changes are quite rare too). The following forces
1077            mtime changes when a transaction completes */
1078 #if HAVE_UTIME
1079         utime(tdb->name, NULL);
1080 #endif
1081
1082         /* use a transaction cancel to free memory and remove the
1083            transaction locks */
1084         _tdb_transaction_cancel(tdb);
1085
1086         return tdb->last_error = TDB_SUCCESS;
1087 }
1088
1089
1090 /*
1091   recover from an aborted transaction. Must be called with exclusive
1092   database write access already established (including the open
1093   lock to prevent new processes attaching)
1094 */
1095 enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb)
1096 {
1097         tdb_off_t recovery_head, recovery_eof;
1098         unsigned char *data, *p;
1099         struct tdb_recovery_record rec;
1100         enum TDB_ERROR ecode;
1101
1102         /* find the recovery area */
1103         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1104         if (TDB_OFF_IS_ERR(recovery_head)) {
1105                 return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR,
1106                                   "tdb_transaction_recover:"
1107                                   " failed to read recovery head");
1108         }
1109
1110         if (recovery_head == 0) {
1111                 /* we have never allocated a recovery record */
1112                 return TDB_SUCCESS;
1113         }
1114
1115         /* read the recovery record */
1116         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1117         if (ecode != TDB_SUCCESS) {
1118                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1119                                   "tdb_transaction_recover:"
1120                                   " failed to read recovery record");
1121         }
1122
1123         if (rec.magic != TDB_RECOVERY_MAGIC) {
1124                 /* there is no valid recovery data */
1125                 return TDB_SUCCESS;
1126         }
1127
1128         if (tdb->read_only) {
1129                 return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1130                                   "tdb_transaction_recover:"
1131                                   " attempt to recover read only database");
1132         }
1133
1134         recovery_eof = rec.eof;
1135
1136         data = (unsigned char *)malloc(rec.len);
1137         if (data == NULL) {
1138                 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1139                                   "tdb_transaction_recover:"
1140                                   " failed to allocate recovery data");
1141         }
1142
1143         /* read the full recovery data */
1144         ecode = tdb->methods->tread(tdb, recovery_head + sizeof(rec), data,
1145                                     rec.len);
1146         if (ecode != TDB_SUCCESS) {
1147                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1148                                   "tdb_transaction_recover:"
1149                                   " failed to read recovery data");
1150         }
1151
1152         /* recover the file data */
1153         p = data;
1154         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1155                 tdb_off_t ofs;
1156                 tdb_len_t len;
1157                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1158                 memcpy(&ofs, p, sizeof(ofs));
1159                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1160                 p += sizeof(ofs) + sizeof(len);
1161
1162                 ecode = tdb->methods->twrite(tdb, ofs, p, len);
1163                 if (ecode != TDB_SUCCESS) {
1164                         free(data);
1165                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1166                                           "tdb_transaction_recover:"
1167                                           " failed to recover %zu bytes"
1168                                           " at offset %zu",
1169                                           (size_t)len, (size_t)ofs);
1170                 }
1171                 p += len;
1172         }
1173
1174         free(data);
1175
1176         ecode = transaction_sync(tdb, 0, tdb->file->map_size);
1177         if (ecode != TDB_SUCCESS) {
1178                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1179                                   "tdb_transaction_recover:"
1180                                   " failed to sync recovery");
1181         }
1182
1183         /* if the recovery area is after the recovered eof then remove it */
1184         if (recovery_eof <= recovery_head) {
1185                 ecode = tdb_write_off(tdb, offsetof(struct tdb_header,
1186                                                     recovery),
1187                                       0);
1188                 if (ecode != TDB_SUCCESS) {
1189                         return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1190                                           "tdb_transaction_recover:"
1191                                           " failed to remove recovery head");
1192                 }
1193         }
1194
1195         /* remove the recovery magic */
1196         ecode = tdb_write_off(tdb,
1197                               recovery_head
1198                               + offsetof(struct tdb_recovery_record, magic),
1199                               TDB_RECOVERY_INVALID_MAGIC);
1200         if (ecode != TDB_SUCCESS) {
1201                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1202                                   "tdb_transaction_recover:"
1203                                   " failed to remove recovery magic");
1204         }
1205
1206         ecode = transaction_sync(tdb, 0, recovery_eof);
1207         if (ecode != TDB_SUCCESS) {
1208                 return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
1209                                   "tdb_transaction_recover:"
1210                                   " failed to sync2 recovery");
1211         }
1212
1213         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1214                    "tdb_transaction_recover: recovered %zu byte database",
1215                    (size_t)recovery_eof);
1216
1217         /* all done */
1218         return TDB_SUCCESS;
1219 }
1220
1221 tdb_bool_err tdb_needs_recovery(struct tdb_context *tdb)
1222 {
1223         tdb_off_t recovery_head;
1224         struct tdb_recovery_record rec;
1225         enum TDB_ERROR ecode;
1226
1227         /* find the recovery area */
1228         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1229         if (TDB_OFF_IS_ERR(recovery_head)) {
1230                 return recovery_head;
1231         }
1232
1233         if (recovery_head == 0) {
1234                 /* we have never allocated a recovery record */
1235                 return false;
1236         }
1237
1238         /* read the recovery record */
1239         ecode = tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec));
1240         if (ecode != TDB_SUCCESS) {
1241                 return ecode;
1242         }
1243
1244         return (rec.magic == TDB_RECOVERY_MAGIC);
1245 }