53bcc21c62f45ae687f6a9c6876e015f9e297cf1
[ccan] / ccan / tdb2 / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the tdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     tdb_free() the old record to place it on the normal tdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all tdb_write() calls. The hooked
50     transaction versions of tdb_read() and tdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in posix locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to tdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is cancelled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the tdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of tdb_write
68
69   - allow callers to mix transaction and non-transaction use of tdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or cancelled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the tdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct tdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested tdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         tdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129                             tdb_len_t len)
130 {
131         size_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % getpagesize()) > getpagesize()) {
135                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
136                 if (transaction_read(tdb, off, buf, len2) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / getpagesize();
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->read(tdb, off, buf, len) != 0) {
155                         goto fail;
156                 }
157                 return 0;
158         }
159
160         /* it is in the block list. Now check for the last block */
161         if (blk == tdb->transaction->num_blocks-1) {
162                 if (len > tdb->transaction->last_block_size) {
163                         goto fail;
164                 }
165         }
166
167         /* now copy it out of this block */
168         memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len);
169         return 0;
170
171 fail:
172         tdb->ecode = TDB_ERR_IO;
173         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
174                  "transaction_read: failed at off=%llu len=%llu\n",
175                  (long long)off, (long long)len);
176         tdb->transaction->transaction_error = 1;
177         return -1;
178 }
179
180
181 /*
182   write while in a transaction
183 */
184 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
185                              const void *buf, tdb_len_t len)
186 {
187         size_t blk;
188
189         /* Only a commit is allowed on a prepared transaction */
190         if (tdb->transaction->prepared) {
191                 tdb->ecode = TDB_ERR_EINVAL;
192                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
193                          "transaction_write: transaction already prepared,"
194                          " write not allowed\n");
195                 tdb->transaction->transaction_error = 1;
196                 return -1;
197         }
198
199         /* break it up into block sized chunks */
200         while (len + (off % getpagesize()) > getpagesize()) {
201                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
202                 if (transaction_write(tdb, off, buf, len2) != 0) {
203                         return -1;
204                 }
205                 len -= len2;
206                 off += len2;
207                 if (buf != NULL) {
208                         buf = (const void *)(len2 + (const char *)buf);
209                 }
210         }
211
212         if (len == 0) {
213                 return 0;
214         }
215
216         blk = off / getpagesize();
217         off = off % getpagesize();
218
219         if (tdb->transaction->num_blocks <= blk) {
220                 uint8_t **new_blocks;
221                 /* expand the blocks array */
222                 if (tdb->transaction->blocks == NULL) {
223                         new_blocks = (uint8_t **)malloc(
224                                 (blk+1)*sizeof(uint8_t *));
225                 } else {
226                         new_blocks = (uint8_t **)realloc(
227                                 tdb->transaction->blocks,
228                                 (blk+1)*sizeof(uint8_t *));
229                 }
230                 if (new_blocks == NULL) {
231                         tdb->ecode = TDB_ERR_OOM;
232                         goto fail;
233                 }
234                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
235                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
236                 tdb->transaction->blocks = new_blocks;
237                 tdb->transaction->num_blocks = blk+1;
238                 tdb->transaction->last_block_size = 0;
239         }
240
241         /* allocate and fill a block? */
242         if (tdb->transaction->blocks[blk] == NULL) {
243                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1);
244                 if (tdb->transaction->blocks[blk] == NULL) {
245                         tdb->ecode = TDB_ERR_OOM;
246                         tdb->transaction->transaction_error = 1;
247                         return -1;
248                 }
249                 if (tdb->transaction->old_map_size > blk * getpagesize()) {
250                         tdb_len_t len2 = getpagesize();
251                         if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) {
252                                 len2 = tdb->transaction->old_map_size - (blk * getpagesize());
253                         }
254                         if (tdb->transaction->io_methods->read(tdb, blk * getpagesize(),
255                                                                tdb->transaction->blocks[blk],
256                                                                len2) != 0) {
257                                 SAFE_FREE(tdb->transaction->blocks[blk]);
258                                 goto fail;
259                         }
260                         if (blk == tdb->transaction->num_blocks-1) {
261                                 tdb->transaction->last_block_size = len2;
262                         }
263                 }
264         }
265
266         /* overwrite part of an existing block */
267         if (buf == NULL) {
268                 memset(tdb->transaction->blocks[blk] + off, 0, len);
269         } else {
270                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
271         }
272         if (blk == tdb->transaction->num_blocks-1) {
273                 if (len + off > tdb->transaction->last_block_size) {
274                         tdb->transaction->last_block_size = len + off;
275                 }
276         }
277
278         return 0;
279
280 fail:
281         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
282                  "transaction_write: failed at off=%llu len=%llu\n",
283                  (long long)((blk*getpagesize()) + off),
284                  (long long)len);
285         tdb->transaction->transaction_error = 1;
286         return -1;
287 }
288
289
290 /*
291   write while in a transaction - this varient never expands the transaction blocks, it only
292   updates existing blocks. This means it cannot change the recovery size
293 */
294 static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
295                                        const void *buf, tdb_len_t len)
296 {
297         size_t blk;
298
299         /* break it up into block sized chunks */
300         while (len + (off % getpagesize()) > getpagesize()) {
301                 tdb_len_t len2 = getpagesize() - (off % getpagesize());
302                 transaction_write_existing(tdb, off, buf, len2);
303                 len -= len2;
304                 off += len2;
305                 if (buf != NULL) {
306                         buf = (const void *)(len2 + (const char *)buf);
307                 }
308         }
309
310         if (len == 0) {
311                 return;
312         }
313
314         blk = off / getpagesize();
315         off = off % getpagesize();
316
317         if (tdb->transaction->num_blocks <= blk ||
318             tdb->transaction->blocks[blk] == NULL) {
319                 return;
320         }
321
322         if (blk == tdb->transaction->num_blocks-1 &&
323             off + len > tdb->transaction->last_block_size) {
324                 if (off >= tdb->transaction->last_block_size) {
325                         return;
326                 }
327                 len = tdb->transaction->last_block_size - off;
328         }
329
330         /* overwrite part of an existing block */
331         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
332 }
333
334
335 /*
336   out of bounds check during a transaction
337 */
338 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
339 {
340         if (len <= tdb->map_size) {
341                 return 0;
342         }
343         tdb->ecode = TDB_ERR_IO;
344         return -1;
345 }
346
347 /*
348   transaction version of tdb_expand().
349 */
350 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t addition)
351 {
352         /* add a write to the transaction elements, so subsequent
353            reads see the zero data */
354         if (transaction_write(tdb, tdb->map_size, NULL, addition) != 0) {
355                 return -1;
356         }
357         tdb->map_size += addition;
358         return 0;
359 }
360
361 static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
362                                 size_t len)
363 {
364         /* FIXME */
365         return NULL;
366 }
367
368 static const struct tdb_methods transaction_methods = {
369         transaction_read,
370         transaction_write,
371         transaction_oob,
372         transaction_expand_file,
373         transaction_direct,
374 };
375
376 /*
377   sync to disk
378 */
379 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
380 {
381         if (tdb->flags & TDB_NOSYNC) {
382                 return 0;
383         }
384
385         if (fsync(tdb->fd) != 0) {
386                 tdb->ecode = TDB_ERR_IO;
387                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
388                          "tdb_transaction: fsync failed\n");
389                 return -1;
390         }
391 #ifdef MS_SYNC
392         if (tdb->map_ptr) {
393                 tdb_off_t moffset = offset & ~(getpagesize()-1);
394                 if (msync(moffset + (char *)tdb->map_ptr,
395                           length + (offset - moffset), MS_SYNC) != 0) {
396                         tdb->ecode = TDB_ERR_IO;
397                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
398                                  "tdb_transaction: msync failed - %s\n",
399                                  strerror(errno));
400                         return -1;
401                 }
402         }
403 #endif
404         return 0;
405 }
406
407
408 static void _tdb_transaction_cancel(struct tdb_context *tdb)
409 {
410         int i;
411
412         if (tdb->transaction == NULL) {
413                 tdb->ecode = TDB_ERR_EINVAL;
414                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
415                          "tdb_transaction_cancel: no transaction\n");
416                 return;
417         }
418
419         if (tdb->transaction->nesting != 0) {
420                 tdb->transaction->transaction_error = 1;
421                 tdb->transaction->nesting--;
422                 return;
423         }
424
425         tdb->map_size = tdb->transaction->old_map_size;
426
427         /* free all the transaction blocks */
428         for (i=0;i<tdb->transaction->num_blocks;i++) {
429                 if (tdb->transaction->blocks[i] != NULL) {
430                         free(tdb->transaction->blocks[i]);
431                 }
432         }
433         SAFE_FREE(tdb->transaction->blocks);
434
435         if (tdb->transaction->magic_offset) {
436                 const struct tdb_methods *methods = tdb->transaction->io_methods;
437                 uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC;
438
439                 /* remove the recovery marker */
440                 if (methods->write(tdb, tdb->transaction->magic_offset,
441                                    &invalid, sizeof(invalid)) == -1 ||
442                     transaction_sync(tdb, tdb->transaction->magic_offset,
443                                      sizeof(invalid)) == -1) {
444                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
445                                  "tdb_transaction_cancel: failed to remove"
446                                  " recovery magic\n");
447                 }
448         }
449
450         if (tdb->allrecord_lock.count)
451                 tdb_allrecord_unlock(tdb, tdb->allrecord_lock.ltype);
452
453         /* restore the normal io methods */
454         tdb->methods = tdb->transaction->io_methods;
455
456         tdb_transaction_unlock(tdb, F_WRLCK);
457         tdb_unlock_expand(tdb, F_WRLCK);
458
459         if (tdb_has_open_lock(tdb))
460                 tdb_unlock_open(tdb);
461
462         SAFE_FREE(tdb->transaction);
463 }
464
465 /*
466   start a tdb transaction. No token is returned, as only a single
467   transaction is allowed to be pending per tdb_context
468 */
469 int tdb_transaction_start(struct tdb_context *tdb)
470 {
471         /* some sanity checks */
472         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
473                 tdb->ecode = TDB_ERR_EINVAL;
474                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
475                          "tdb_transaction_start: cannot start a transaction"
476                          " on a read-only or internal db\n");
477                 return -1;
478         }
479
480         /* cope with nested tdb_transaction_start() calls */
481         if (tdb->transaction != NULL) {
482                 tdb->ecode = TDB_ERR_NESTING;
483                 return -1;
484         }
485
486         if (tdb_has_hash_locks(tdb)) {
487                 /* the caller must not have any locks when starting a
488                    transaction as otherwise we'll be screwed by lack
489                    of nested locks in posix */
490                 tdb->ecode = TDB_ERR_LOCK;
491                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
492                          "tdb_transaction_start: cannot start a transaction"
493                          " with locks held\n");
494                 return -1;
495         }
496
497         tdb->transaction = (struct tdb_transaction *)
498                 calloc(sizeof(struct tdb_transaction), 1);
499         if (tdb->transaction == NULL) {
500                 tdb->ecode = TDB_ERR_OOM;
501                 return -1;
502         }
503
504         /* get the transaction write lock. This is a blocking lock. As
505            discussed with Volker, there are a number of ways we could
506            make this async, which we will probably do in the future */
507         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
508                 SAFE_FREE(tdb->transaction->blocks);
509                 SAFE_FREE(tdb->transaction);
510                 return -1;
511         }
512
513         /* get a read lock over entire file. This is upgraded to a write
514            lock during the commit */
515         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
516                 goto fail_allrecord_lock;
517         }
518
519         if (tdb_lock_expand(tdb, F_WRLCK) != 0) {
520                 goto fail_expand_lock;
521         }
522
523         /* make sure we know about any file expansions already done by
524            anyone else */
525         tdb->methods->oob(tdb, tdb->map_size + 1, true);
526         tdb->transaction->old_map_size = tdb->map_size;
527
528         /* finally hook the io methods, replacing them with
529            transaction specific methods */
530         tdb->transaction->io_methods = tdb->methods;
531         tdb->methods = &transaction_methods;
532         return 0;
533
534 fail_expand_lock:
535         tdb_allrecord_unlock(tdb, F_RDLCK);
536 fail_allrecord_lock:
537         tdb_transaction_unlock(tdb, F_WRLCK);
538         SAFE_FREE(tdb->transaction->blocks);
539         SAFE_FREE(tdb->transaction);
540         return -1;
541 }
542
543
544 /*
545   cancel the current transaction
546 */
547 void tdb_transaction_cancel(struct tdb_context *tdb)
548 {
549         _tdb_transaction_cancel(tdb);
550 }
551
552 /*
553   work out how much space the linearised recovery data will consume
554 */
555 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
556 {
557         tdb_len_t recovery_size = 0;
558         int i;
559
560         recovery_size = sizeof(tdb_len_t);
561         for (i=0;i<tdb->transaction->num_blocks;i++) {
562                 if (i * getpagesize() >= tdb->transaction->old_map_size) {
563                         break;
564                 }
565                 if (tdb->transaction->blocks[i] == NULL) {
566                         continue;
567                 }
568                 recovery_size += 2*sizeof(tdb_off_t);
569                 if (i == tdb->transaction->num_blocks-1) {
570                         recovery_size += tdb->transaction->last_block_size;
571                 } else {
572                         recovery_size += getpagesize();
573                 }
574         }
575
576         return recovery_size;
577 }
578
579 /*
580   allocate the recovery area, or use an existing recovery area if it is
581   large enough
582 */
583 static int tdb_recovery_allocate(struct tdb_context *tdb,
584                                  tdb_len_t *recovery_size,
585                                  tdb_off_t *recovery_offset,
586                                  tdb_len_t *recovery_max_size)
587 {
588         struct tdb_recovery_record rec;
589         const struct tdb_methods *methods = tdb->transaction->io_methods;
590         tdb_off_t recovery_head;
591         size_t addition;
592
593         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
594         if (recovery_head == TDB_OFF_ERR) {
595                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
596                          "tdb_recovery_allocate:"
597                          " failed to read recovery head\n");
598                 return -1;
599         }
600
601         if (recovery_head != 0) {
602                 if (methods->read(tdb, recovery_head, &rec, sizeof(rec))) {
603                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
604                                  "tdb_recovery_allocate:"
605                                  " failed to read recovery record\n");
606                         return -1;
607                 }
608                 tdb_convert(tdb, &rec, sizeof(rec));
609                 /* ignore invalid recovery regions: can happen in crash */
610                 if (rec.magic != TDB_RECOVERY_MAGIC &&
611                     rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
612                         recovery_head = 0;
613                 }
614         }
615
616         *recovery_size = tdb_recovery_size(tdb);
617
618         if (recovery_head != 0 && *recovery_size <= rec.max_len) {
619                 /* it fits in the existing area */
620                 *recovery_max_size = rec.max_len;
621                 *recovery_offset = recovery_head;
622                 return 0;
623         }
624
625         /* we need to free up the old recovery area, then allocate a
626            new one at the end of the file. Note that we cannot use
627            normal allocation to allocate the new one as that might return
628            us an area that is being currently used (as of the start of
629            the transaction) */
630         if (recovery_head != 0) {
631                 if (add_free_record(tdb, recovery_head,
632                                     sizeof(rec) + rec.max_len) != 0) {
633                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
634                                  "tdb_recovery_allocate:"
635                                  " failed to free previous recovery area\n");
636                         return -1;
637                 }
638         }
639
640         /* the tdb_free() call might have increased the recovery size */
641         *recovery_size = tdb_recovery_size(tdb);
642
643         /* round up to a multiple of page size */
644         *recovery_max_size
645                 = (((sizeof(rec) + *recovery_size) + getpagesize()-1)
646                    & ~(getpagesize()-1))
647                 - sizeof(rec);
648         *recovery_offset = tdb->map_size;
649         recovery_head = *recovery_offset;
650
651         /* Restore ->map_size before calling underlying expand_file.
652            Also so that we don't try to expand the file again in the
653            transaction commit, which would destroy the recovery
654            area */
655         addition = (tdb->map_size - tdb->transaction->old_map_size) +
656                 sizeof(rec) + *recovery_max_size;
657         tdb->map_size = tdb->transaction->old_map_size;
658         if (methods->expand_file(tdb, addition) == -1) {
659                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
660                          "tdb_recovery_allocate:"
661                          " failed to create recovery area\n");
662                 return -1;
663         }
664
665         /* we have to reset the old map size so that we don't try to
666            expand the file again in the transaction commit, which
667            would destroy the recovery area */
668         tdb->transaction->old_map_size = tdb->map_size;
669
670         /* write the recovery header offset and sync - we can sync without a race here
671            as the magic ptr in the recovery record has not been set */
672         tdb_convert(tdb, &recovery_head, sizeof(recovery_head));
673         if (methods->write(tdb, offsetof(struct tdb_header, recovery),
674                            &recovery_head, sizeof(tdb_off_t)) == -1) {
675                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
676                          "tdb_recovery_allocate:"
677                          " failed to write recovery head\n");
678                 return -1;
679         }
680         transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
681                                    &recovery_head,
682                                    sizeof(tdb_off_t));
683         return 0;
684 }
685
686 /* Set up header for the recovery record. */
687 static void set_recovery_header(struct tdb_recovery_record *rec,
688                                 uint64_t magic,
689                                 uint64_t datalen, uint64_t actuallen,
690                                 uint64_t oldsize)
691 {
692         rec->magic = magic;
693         rec->max_len = actuallen;
694         rec->len = datalen;
695         rec->eof = oldsize;
696 }
697
698 /*
699   setup the recovery data that will be used on a crash during commit
700 */
701 static int transaction_setup_recovery(struct tdb_context *tdb,
702                                       tdb_off_t *magic_offset)
703 {
704         tdb_len_t recovery_size;
705         unsigned char *data, *p;
706         const struct tdb_methods *methods = tdb->transaction->io_methods;
707         struct tdb_recovery_record *rec;
708         tdb_off_t recovery_offset, recovery_max_size;
709         tdb_off_t old_map_size = tdb->transaction->old_map_size;
710         uint64_t magic, tailer;
711         int i;
712
713         /*
714           check that the recovery area has enough space
715         */
716         if (tdb_recovery_allocate(tdb, &recovery_size,
717                                   &recovery_offset, &recovery_max_size) == -1) {
718                 return -1;
719         }
720
721         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
722         if (data == NULL) {
723                 tdb->ecode = TDB_ERR_OOM;
724                 return -1;
725         }
726
727         rec = (struct tdb_recovery_record *)data;
728         set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC,
729                             recovery_size, recovery_max_size, old_map_size);
730         tdb_convert(tdb, rec, sizeof(*rec));
731
732         /* build the recovery data into a single blob to allow us to do a single
733            large write, which should be more efficient */
734         p = data + sizeof(*rec);
735         for (i=0;i<tdb->transaction->num_blocks;i++) {
736                 tdb_off_t offset;
737                 tdb_len_t length;
738
739                 if (tdb->transaction->blocks[i] == NULL) {
740                         continue;
741                 }
742
743                 offset = i * getpagesize();
744                 length = getpagesize();
745                 if (i == tdb->transaction->num_blocks-1) {
746                         length = tdb->transaction->last_block_size;
747                 }
748
749                 if (offset >= old_map_size) {
750                         continue;
751                 }
752                 if (offset + length > tdb->map_size) {
753                         tdb->ecode = TDB_ERR_CORRUPT;
754                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
755                                  "tdb_transaction_setup_recovery:"
756                                  " transaction data over new region boundary\n");
757                         free(data);
758                         return -1;
759                 }
760                 memcpy(p, &offset, sizeof(offset));
761                 memcpy(p + sizeof(offset), &length, sizeof(length));
762                 tdb_convert(tdb, p, sizeof(offset) + sizeof(length));
763
764                 /* the recovery area contains the old data, not the
765                    new data, so we have to call the original tdb_read
766                    method to get it */
767                 if (methods->read(tdb, offset,
768                                   p + sizeof(offset) + sizeof(length),
769                                   length) != 0) {
770                         free(data);
771                         return -1;
772                 }
773                 p += sizeof(offset) + sizeof(length) + length;
774         }
775
776         /* and the tailer */
777         tailer = sizeof(*rec) + recovery_max_size;
778         memcpy(p, &tailer, sizeof(tailer));
779         tdb_convert(tdb, p, sizeof(tailer));
780
781         /* write the recovery data to the recovery area */
782         if (methods->write(tdb, recovery_offset, data,
783                            sizeof(*rec) + recovery_size) == -1) {
784                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
785                          "tdb_transaction_setup_recovery:"
786                          " failed to write recovery data\n");
787                 free(data);
788                 return -1;
789         }
790         transaction_write_existing(tdb, recovery_offset, data,
791                                    sizeof(*rec) + recovery_size);
792
793         /* as we don't have ordered writes, we have to sync the recovery
794            data before we update the magic to indicate that the recovery
795            data is present */
796         if (transaction_sync(tdb, recovery_offset,
797                              sizeof(*rec) + recovery_size) == -1) {
798                 free(data);
799                 return -1;
800         }
801
802         free(data);
803
804         magic = TDB_RECOVERY_MAGIC;
805         tdb_convert(tdb, &magic, sizeof(magic));
806
807         *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record,
808                                                    magic);
809
810         if (methods->write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
811                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
812                          "tdb_transaction_setup_recovery:"
813                          " failed to write recovery magic\n");
814                 return -1;
815         }
816         transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic));
817
818         /* ensure the recovery magic marker is on disk */
819         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
820                 return -1;
821         }
822
823         return 0;
824 }
825
826 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
827 {
828         const struct tdb_methods *methods;
829
830         if (tdb->transaction == NULL) {
831                 tdb->ecode = TDB_ERR_EINVAL;
832                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
833                          "tdb_transaction_prepare_commit: no transaction\n");
834                 return -1;
835         }
836
837         if (tdb->transaction->prepared) {
838                 tdb->ecode = TDB_ERR_EINVAL;
839                 _tdb_transaction_cancel(tdb);
840                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
841                          "tdb_transaction_prepare_commit:"
842                          " transaction already prepared\n");
843                 return -1;
844         }
845
846         if (tdb->transaction->transaction_error) {
847                 tdb->ecode = TDB_ERR_IO;
848                 _tdb_transaction_cancel(tdb);
849                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
850                          "tdb_transaction_prepare_commit:"
851                          " transaction error pending\n");
852                 return -1;
853         }
854
855
856         if (tdb->transaction->nesting != 0) {
857                 tdb->transaction->nesting--;
858                 return 0;
859         }
860
861         /* check for a null transaction */
862         if (tdb->transaction->blocks == NULL) {
863                 return 0;
864         }
865
866         methods = tdb->transaction->io_methods;
867
868         /* upgrade the main transaction lock region to a write lock */
869         if (tdb_allrecord_upgrade(tdb) == -1) {
870                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
871                          "tdb_transaction_prepare_commit:"
872                          " failed to upgrade hash locks\n");
873                 _tdb_transaction_cancel(tdb);
874                 return -1;
875         }
876
877         /* get the open lock - this prevents new users attaching to the database
878            during the commit */
879         if (tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK) == -1) {
880                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
881                          "tdb_transaction_prepare_commit:"
882                          " failed to get open lock\n");
883                 _tdb_transaction_cancel(tdb);
884                 return -1;
885         }
886
887         if (!(tdb->flags & TDB_NOSYNC)) {
888                 /* write the recovery data to the end of the file */
889                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
890                         tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
891                                  "tdb_transaction_prepare_commit:"
892                                  " failed to setup recovery data\n");
893                         _tdb_transaction_cancel(tdb);
894                         return -1;
895                 }
896         }
897
898         tdb->transaction->prepared = true;
899
900         /* expand the file to the new size if needed */
901         if (tdb->map_size != tdb->transaction->old_map_size) {
902                 tdb_len_t add = tdb->map_size - tdb->transaction->old_map_size;
903                 /* Restore original map size for tdb_expand_file */
904                 tdb->map_size = tdb->transaction->old_map_size;
905                 if (methods->expand_file(tdb, add) == -1) {
906                         tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
907                                  "tdb_transaction_prepare_commit:"
908                                  " expansion failed\n");
909                         _tdb_transaction_cancel(tdb);
910                         return -1;
911                 }
912         }
913
914         /* Keep the open lock until the actual commit */
915
916         return 0;
917 }
918
919 /*
920    prepare to commit the current transaction
921 */
922 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
923 {
924         return _tdb_transaction_prepare_commit(tdb);
925 }
926
927 /*
928   commit the current transaction
929 */
930 int tdb_transaction_commit(struct tdb_context *tdb)
931 {
932         const struct tdb_methods *methods;
933         int i;
934
935         if (tdb->transaction == NULL) {
936                 tdb->ecode = TDB_ERR_EINVAL;
937                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
938                          "tdb_transaction_commit: no transaction\n");
939                 return -1;
940         }
941
942         tdb_trace(tdb, "tdb_transaction_commit");
943
944         if (tdb->transaction->transaction_error) {
945                 tdb->ecode = TDB_ERR_IO;
946                 tdb_transaction_cancel(tdb);
947                 tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
948                          "tdb_transaction_commit: transaction error pending\n");
949                 return -1;
950         }
951
952
953         if (tdb->transaction->nesting != 0) {
954                 tdb->transaction->nesting--;
955                 return 0;
956         }
957
958         /* check for a null transaction */
959         if (tdb->transaction->blocks == NULL) {
960                 _tdb_transaction_cancel(tdb);
961                 return 0;
962         }
963
964         if (!tdb->transaction->prepared) {
965                 int ret = _tdb_transaction_prepare_commit(tdb);
966                 if (ret)
967                         return ret;
968         }
969
970         methods = tdb->transaction->io_methods;
971
972         /* perform all the writes */
973         for (i=0;i<tdb->transaction->num_blocks;i++) {
974                 tdb_off_t offset;
975                 tdb_len_t length;
976
977                 if (tdb->transaction->blocks[i] == NULL) {
978                         continue;
979                 }
980
981                 offset = i * getpagesize();
982                 length = getpagesize();
983                 if (i == tdb->transaction->num_blocks-1) {
984                         length = tdb->transaction->last_block_size;
985                 }
986
987                 if (methods->write(tdb, offset, tdb->transaction->blocks[i],
988                                    length) == -1) {
989                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
990                                  "tdb_transaction_commit:"
991                                  " write failed during commit\n");
992
993                         /* we've overwritten part of the data and
994                            possibly expanded the file, so we need to
995                            run the crash recovery code */
996                         tdb->methods = methods;
997                         tdb_transaction_recover(tdb);
998
999                         _tdb_transaction_cancel(tdb);
1000
1001                         return -1;
1002                 }
1003                 SAFE_FREE(tdb->transaction->blocks[i]);
1004         }
1005
1006         SAFE_FREE(tdb->transaction->blocks);
1007         tdb->transaction->num_blocks = 0;
1008
1009         /* ensure the new data is on disk */
1010         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1011                 return -1;
1012         }
1013
1014         /*
1015           TODO: maybe write to some dummy hdr field, or write to magic
1016           offset without mmap, before the last sync, instead of the
1017           utime() call
1018         */
1019
1020         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1021            don't change the mtime of the file, this means the file may
1022            not be backed up (as tdb rounding to block sizes means that
1023            file size changes are quite rare too). The following forces
1024            mtime changes when a transaction completes */
1025 #if HAVE_UTIME
1026         utime(tdb->name, NULL);
1027 #endif
1028
1029         /* use a transaction cancel to free memory and remove the
1030            transaction locks */
1031         _tdb_transaction_cancel(tdb);
1032
1033         return 0;
1034 }
1035
1036
1037 /*
1038   recover from an aborted transaction. Must be called with exclusive
1039   database write access already established (including the open
1040   lock to prevent new processes attaching)
1041 */
1042 int tdb_transaction_recover(struct tdb_context *tdb)
1043 {
1044         tdb_off_t recovery_head, recovery_eof;
1045         unsigned char *data, *p;
1046         struct tdb_recovery_record rec;
1047
1048         /* find the recovery area */
1049         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1050         if (recovery_head == TDB_OFF_ERR) {
1051                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1052                          "tdb_transaction_recover:"
1053                          " failed to read recovery head\n");
1054                 return -1;
1055         }
1056
1057         if (recovery_head == 0) {
1058                 /* we have never allocated a recovery record */
1059                 return 0;
1060         }
1061
1062         /* read the recovery record */
1063         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1064                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1065                          "tdb_transaction_recover:"
1066                          " failed to read recovery record\n");
1067                 return -1;
1068         }
1069
1070         if (rec.magic != TDB_RECOVERY_MAGIC) {
1071                 /* there is no valid recovery data */
1072                 return 0;
1073         }
1074
1075         if (tdb->read_only) {
1076                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1077                          "tdb_transaction_recover:"
1078                          " attempt to recover read only database\n");
1079                 tdb->ecode = TDB_ERR_CORRUPT;
1080                 return -1;
1081         }
1082
1083         recovery_eof = rec.eof;
1084
1085         data = (unsigned char *)malloc(rec.len);
1086         if (data == NULL) {
1087                 tdb->ecode = TDB_ERR_OOM;
1088                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1089                          "tdb_transaction_recover:"
1090                          " failed to allocate recovery data\n");
1091                 return -1;
1092         }
1093
1094         /* read the full recovery data */
1095         if (tdb->methods->read(tdb, recovery_head + sizeof(rec), data,
1096                                rec.len) == -1) {
1097                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1098                          "tdb_transaction_recover:"
1099                          " failed to read recovery data\n");
1100                 return -1;
1101         }
1102
1103         /* recover the file data */
1104         p = data;
1105         while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) {
1106                 tdb_off_t ofs;
1107                 tdb_len_t len;
1108                 tdb_convert(tdb, p, sizeof(ofs) + sizeof(len));
1109                 memcpy(&ofs, p, sizeof(ofs));
1110                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1111                 p += sizeof(ofs) + sizeof(len);
1112
1113                 if (tdb->methods->write(tdb, ofs, p, len) == -1) {
1114                         free(data);
1115                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1116                                  "tdb_transaction_recover:"
1117                                  " failed to recover %zu bytes at offset %zu\n",
1118                                  (size_t)len, (size_t)ofs);
1119                         return -1;
1120                 }
1121                 p += len;
1122         }
1123
1124         free(data);
1125
1126         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1127                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1128                          "tdb_transaction_recover: failed to sync recovery\n");
1129                 return -1;
1130         }
1131
1132         /* if the recovery area is after the recovered eof then remove it */
1133         if (recovery_eof <= recovery_head) {
1134                 if (tdb_write_off(tdb, offsetof(struct tdb_header,recovery), 0)
1135                     == -1) {
1136                         tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1137                                  "tdb_transaction_recover:"
1138                                  " failed to remove recovery head\n");
1139                         return -1;
1140                 }
1141         }
1142
1143         /* remove the recovery magic */
1144         if (tdb_write_off(tdb,
1145                           recovery_head
1146                           + offsetof(struct tdb_recovery_record, magic),
1147                           TDB_RECOVERY_INVALID_MAGIC) == -1) {
1148                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1149                          "tdb_transaction_recover:"
1150                          " failed to remove recovery magic\n");
1151                 return -1;
1152         }
1153
1154         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1155                 tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
1156                          "tdb_transaction_recover: failed to sync2 recovery\n");
1157                 return -1;
1158         }
1159
1160         tdb->log(tdb, TDB_DEBUG_TRACE, tdb->log_priv,
1161                  "tdb_transaction_recover: recovered %zu byte database\n",
1162                  (size_t)recovery_eof);
1163
1164         /* all done */
1165         return 0;
1166 }
1167
1168 /* Any I/O failures we say "needs recovery". */
1169 bool tdb_needs_recovery(struct tdb_context *tdb)
1170 {
1171         tdb_off_t recovery_head;
1172         struct tdb_recovery_record rec;
1173
1174         /* find the recovery area */
1175         recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery));
1176         if (recovery_head == TDB_OFF_ERR) {
1177                 return true;
1178         }
1179
1180         if (recovery_head == 0) {
1181                 /* we have never allocated a recovery record */
1182                 return false;
1183         }
1184
1185         /* read the recovery record */
1186         if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) {
1187                 return true;
1188         }
1189
1190         return (rec.magic == TDB_RECOVERY_MAGIC);
1191 }