]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/tdb1_transaction.c
51aa2e11b00b38066f38afbc3e8b89b35c9e9f4b
[ccan] / ccan / tdb2 / tdb1_transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb1_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb1_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb1_write() calls. The hooked
48     transaction versions of tdb1_read() and tdb1_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb1_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb1_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB1_NOSYNC is passed to flags in tdb1_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB1_ALLOW_NESTING is passed to flags in tdb open, or added using
89     tdb1_add_flags() transaction nesting is enabled.
90     It resets the TDB1_DISALLOW_NESTING flag, as both cannot be used together.
91     The default is that transaction nesting is allowed.
92     Note: this default may change in future versions of tdb.
93
94     Beware. when transactions are nested a transaction successfully
95     completed with tdb1_transaction_commit() can be silently unrolled later.
96
97   - if TDB1_DISALLOW_NESTING is passed to flags in tdb open, or added using
98     tdb1_add_flags() transaction nesting is disabled.
99     It resets the TDB1_ALLOW_NESTING flag, as both cannot be used together.
100     An attempt create a nested transaction will fail with TDB_ERR_EINVAL.
101     The default is that transaction nesting is allowed.
102     Note: this default may change in future versions of tdb.
103 */
104
105
106 /*
107   hold the context of any current transaction
108 */
109 struct tdb1_transaction {
110         /* we keep a mirrored copy of the tdb hash heads here so
111            tdb1_next_hash_chain() can operate efficiently */
112         uint32_t *hash_heads;
113
114         /* the original io methods - used to do IOs to the real db */
115         const struct tdb1_methods *io_methods;
116
117         /* the list of transaction blocks. When a block is first
118            written to, it gets created in this list */
119         uint8_t **blocks;
120         uint32_t num_blocks;
121         uint32_t block_size;      /* bytes in each block */
122         uint32_t last_block_size; /* number of valid bytes in the last block */
123
124         /* non-zero when an internal transaction error has
125            occurred. All write operations will then fail until the
126            transaction is ended */
127         int transaction_error;
128
129         /* when inside a transaction we need to keep track of any
130            nested tdb1_transaction_start() calls, as these are allowed,
131            but don't create a new transaction */
132         int nesting;
133
134         /* set when a prepare has already occurred */
135         bool prepared;
136         tdb1_off_t magic_offset;
137
138         /* old file size before transaction */
139         tdb1_len_t old_map_size;
140
141         /* did we expand in this transaction */
142         bool expanded;
143 };
144
145
146 /*
147   read while in a transaction. We need to check first if the data is in our list
148   of transaction elements, then if not do a real read
149 */
150 static int transaction1_read(struct tdb1_context *tdb, tdb1_off_t off, void *buf,
151                              tdb1_len_t len, int cv)
152 {
153         uint32_t blk;
154
155         /* break it down into block sized ops */
156         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
157                 tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
158                 if (transaction1_read(tdb, off, buf, len2, cv) != 0) {
159                         return -1;
160                 }
161                 len -= len2;
162                 off += len2;
163                 buf = (void *)(len2 + (char *)buf);
164         }
165
166         if (len == 0) {
167                 return 0;
168         }
169
170         blk = off / tdb->transaction->block_size;
171
172         /* see if we have it in the block list */
173         if (tdb->transaction->num_blocks <= blk ||
174             tdb->transaction->blocks[blk] == NULL) {
175                 /* nope, do a real read */
176                 if (tdb->transaction->io_methods->tdb1_read(tdb, off, buf, len, cv) != 0) {
177                         goto fail;
178                 }
179                 return 0;
180         }
181
182         /* it is in the block list. Now check for the last block */
183         if (blk == tdb->transaction->num_blocks-1) {
184                 if (len > tdb->transaction->last_block_size) {
185                         goto fail;
186                 }
187         }
188
189         /* now copy it out of this block */
190         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
191         if (cv) {
192                 tdb1_convert(buf, len);
193         }
194         return 0;
195
196 fail:
197         tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
198                                 "transaction_read: failed at off=%d len=%d",
199                                 off, len);
200         tdb->transaction->transaction_error = 1;
201         return -1;
202 }
203
204
205 /*
206   write while in a transaction
207 */
208 static int transaction1_write(struct tdb1_context *tdb, tdb1_off_t off,
209                              const void *buf, tdb1_len_t len)
210 {
211         uint32_t blk;
212
213         /* Only a commit is allowed on a prepared transaction */
214         if (tdb->transaction->prepared) {
215                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
216                                         "transaction_write: transaction already"
217                                         " prepared, write not allowed");
218                 tdb->transaction->transaction_error = 1;
219                 return -1;
220         }
221
222         /* if the write is to a hash head, then update the transaction
223            hash heads */
224         if (len == sizeof(tdb1_off_t) && off >= TDB1_FREELIST_TOP &&
225             off < TDB1_FREELIST_TOP+TDB1_HASHTABLE_SIZE(tdb)) {
226                 uint32_t chain = (off-TDB1_FREELIST_TOP) / sizeof(tdb1_off_t);
227                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
228         }
229
230         /* break it up into block sized chunks */
231         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
232                 tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
233                 if (transaction1_write(tdb, off, buf, len2) != 0) {
234                         return -1;
235                 }
236                 len -= len2;
237                 off += len2;
238                 if (buf != NULL) {
239                         buf = (const void *)(len2 + (const char *)buf);
240                 }
241         }
242
243         if (len == 0) {
244                 return 0;
245         }
246
247         blk = off / tdb->transaction->block_size;
248         off = off % tdb->transaction->block_size;
249
250         if (tdb->transaction->num_blocks <= blk) {
251                 uint8_t **new_blocks;
252                 /* expand the blocks array */
253                 if (tdb->transaction->blocks == NULL) {
254                         new_blocks = (uint8_t **)malloc(
255                                 (blk+1)*sizeof(uint8_t *));
256                 } else {
257                         new_blocks = (uint8_t **)realloc(
258                                 tdb->transaction->blocks,
259                                 (blk+1)*sizeof(uint8_t *));
260                 }
261                 if (new_blocks == NULL) {
262                         tdb->last_error = TDB_ERR_OOM;
263                         goto fail;
264                 }
265                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
266                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
267                 tdb->transaction->blocks = new_blocks;
268                 tdb->transaction->num_blocks = blk+1;
269                 tdb->transaction->last_block_size = 0;
270         }
271
272         /* allocate and fill a block? */
273         if (tdb->transaction->blocks[blk] == NULL) {
274                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
275                 if (tdb->transaction->blocks[blk] == NULL) {
276                         tdb->last_error = TDB_ERR_OOM;
277                         tdb->transaction->transaction_error = 1;
278                         return -1;
279                 }
280                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
281                         tdb1_len_t len2 = tdb->transaction->block_size;
282                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
283                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
284                         }
285                         if (tdb->transaction->io_methods->tdb1_read(tdb, blk * tdb->transaction->block_size,
286                                                                    tdb->transaction->blocks[blk],
287                                                                    len2, 0) != 0) {
288                                 SAFE_FREE(tdb->transaction->blocks[blk]);
289                                 tdb->last_error = TDB_ERR_IO;
290                                 goto fail;
291                         }
292                         if (blk == tdb->transaction->num_blocks-1) {
293                                 tdb->transaction->last_block_size = len2;
294                         }
295                 }
296         }
297
298         /* overwrite part of an existing block */
299         if (buf == NULL) {
300                 memset(tdb->transaction->blocks[blk] + off, 0, len);
301         } else {
302                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
303         }
304         if (blk == tdb->transaction->num_blocks-1) {
305                 if (len + off > tdb->transaction->last_block_size) {
306                         tdb->transaction->last_block_size = len + off;
307                 }
308         }
309
310         return 0;
311
312 fail:
313         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
314                    "transaction_write: failed at off=%d len=%d",
315                    (blk*tdb->transaction->block_size) + off, len);
316         tdb->transaction->transaction_error = 1;
317         return -1;
318 }
319
320
321 /*
322   write while in a transaction - this varient never expands the transaction blocks, it only
323   updates existing blocks. This means it cannot change the recovery size
324 */
325 static int transaction1_write_existing(struct tdb1_context *tdb, tdb1_off_t off,
326                                       const void *buf, tdb1_len_t len)
327 {
328         uint32_t blk;
329
330         /* break it up into block sized chunks */
331         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
332                 tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
333                 if (transaction1_write_existing(tdb, off, buf, len2) != 0) {
334                         return -1;
335                 }
336                 len -= len2;
337                 off += len2;
338                 if (buf != NULL) {
339                         buf = (const void *)(len2 + (const char *)buf);
340                 }
341         }
342
343         if (len == 0) {
344                 return 0;
345         }
346
347         blk = off / tdb->transaction->block_size;
348         off = off % tdb->transaction->block_size;
349
350         if (tdb->transaction->num_blocks <= blk ||
351             tdb->transaction->blocks[blk] == NULL) {
352                 return 0;
353         }
354
355         if (blk == tdb->transaction->num_blocks-1 &&
356             off + len > tdb->transaction->last_block_size) {
357                 if (off >= tdb->transaction->last_block_size) {
358                         return 0;
359                 }
360                 len = tdb->transaction->last_block_size - off;
361         }
362
363         /* overwrite part of an existing block */
364         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
365
366         return 0;
367 }
368
369
370 /*
371   accelerated hash chain head search, using the cached hash heads
372 */
373 static void transaction1_next_hash_chain(struct tdb1_context *tdb, uint32_t *chain)
374 {
375         uint32_t h = *chain;
376         for (;h < tdb->header.hash_size;h++) {
377                 /* the +1 takes account of the freelist */
378                 if (0 != tdb->transaction->hash_heads[h+1]) {
379                         break;
380                 }
381         }
382         (*chain) = h;
383 }
384
385 /*
386   out of bounds check during a transaction
387 */
388 static int transaction1_oob(struct tdb1_context *tdb, tdb1_off_t len, int probe)
389 {
390         if (len <= tdb->map_size) {
391                 return 0;
392         }
393         tdb->last_error = TDB_ERR_IO;
394         return -1;
395 }
396
397 /*
398   transaction version of tdb1_expand().
399 */
400 static int transaction1_expand_file(struct tdb1_context *tdb, tdb1_off_t size,
401                                     tdb1_off_t addition)
402 {
403         /* add a write to the transaction elements, so subsequent
404            reads see the zero data */
405         if (transaction1_write(tdb, size, NULL, addition) != 0) {
406                 return -1;
407         }
408
409         tdb->transaction->expanded = true;
410
411         return 0;
412 }
413
414 static const struct tdb1_methods transaction1_methods = {
415         transaction1_read,
416         transaction1_write,
417         transaction1_next_hash_chain,
418         transaction1_oob,
419         transaction1_expand_file,
420 };
421
422
423 /*
424   start a tdb transaction. No token is returned, as only a single
425   transaction is allowed to be pending per tdb1_context
426 */
427 static int _tdb1_transaction_start(struct tdb1_context *tdb)
428 {
429         /* some sanity checks */
430         if (tdb->read_only || (tdb->flags & TDB1_INTERNAL) || tdb->traverse_read) {
431                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
432                                         "tdb1_transaction_start: cannot start a"
433                                         " transaction on a read-only or"
434                                         " internal db");
435                 return -1;
436         }
437
438         /* cope with nested tdb1_transaction_start() calls */
439         if (tdb->transaction != NULL) {
440                 if (!(tdb->flags & TDB1_ALLOW_NESTING)) {
441                         tdb->last_error = TDB_ERR_EINVAL;
442                         return -1;
443                 }
444                 tdb->transaction->nesting++;
445                 return 0;
446         }
447
448         if (tdb1_have_extra_locks(tdb)) {
449                 /* the caller must not have any locks when starting a
450                    transaction as otherwise we'll be screwed by lack
451                    of nested locks in posix */
452                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
453                                         "tdb1_transaction_start: cannot start a"
454                                         " transaction with locks held");
455                 return -1;
456         }
457
458         if (tdb->travlocks.next != NULL) {
459                 /* you cannot use transactions inside a traverse (although you can use
460                    traverse inside a transaction) as otherwise you can end up with
461                    deadlock */
462                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
463                                         "tdb1_transaction_start: cannot start a"
464                                         " transaction within a traverse");
465                 return -1;
466         }
467
468         tdb->transaction = (struct tdb1_transaction *)
469                 calloc(sizeof(struct tdb1_transaction), 1);
470         if (tdb->transaction == NULL) {
471                 tdb->last_error = TDB_ERR_OOM;
472                 return -1;
473         }
474
475         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
476         tdb->transaction->block_size = tdb->page_size;
477
478         /* get the transaction write lock. This is a blocking lock. As
479            discussed with Volker, there are a number of ways we could
480            make this async, which we will probably do in the future */
481         if (tdb1_transaction_lock(tdb, F_WRLCK, TDB_LOCK_WAIT) == -1) {
482                 SAFE_FREE(tdb->transaction->blocks);
483                 SAFE_FREE(tdb->transaction);
484                 return -1;
485         }
486
487         /* get a read lock from the freelist to the end of file. This
488            is upgraded to a write lock during the commit */
489         if (tdb1_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
490                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
491                            "tdb1_transaction_start: failed to get hash locks");
492                 goto fail_allrecord_lock;
493         }
494
495         /* setup a copy of the hash table heads so the hash scan in
496            traverse can be fast */
497         tdb->transaction->hash_heads = (uint32_t *)
498                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
499         if (tdb->transaction->hash_heads == NULL) {
500                 tdb->last_error = TDB_ERR_OOM;
501                 goto fail;
502         }
503         if (tdb->methods->tdb1_read(tdb, TDB1_FREELIST_TOP, tdb->transaction->hash_heads,
504                                    TDB1_HASHTABLE_SIZE(tdb), 0) != 0) {
505                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
506                            "tdb1_transaction_start: failed to read hash heads");
507                 goto fail;
508         }
509
510         /* make sure we know about any file expansions already done by
511            anyone else */
512         tdb->methods->tdb1_oob(tdb, tdb->map_size + 1, 1);
513         tdb->transaction->old_map_size = tdb->map_size;
514
515         /* finally hook the io methods, replacing them with
516            transaction specific methods */
517         tdb->transaction->io_methods = tdb->methods;
518         tdb->methods = &transaction1_methods;
519
520         return 0;
521
522 fail:
523         tdb1_allrecord_unlock(tdb, F_RDLCK);
524 fail_allrecord_lock:
525         tdb1_transaction_unlock(tdb, F_WRLCK);
526         SAFE_FREE(tdb->transaction->blocks);
527         SAFE_FREE(tdb->transaction->hash_heads);
528         SAFE_FREE(tdb->transaction);
529         return -1;
530 }
531
532 int tdb1_transaction_start(struct tdb1_context *tdb)
533 {
534         return _tdb1_transaction_start(tdb);
535 }
536
537 /*
538   sync to disk
539 */
540 static int transaction1_sync(struct tdb1_context *tdb, tdb1_off_t offset, tdb1_len_t length)
541 {
542         if (tdb->flags & TDB1_NOSYNC) {
543                 return 0;
544         }
545
546 #if HAVE_FDATASYNC
547         if (fdatasync(tdb->fd) != 0) {
548 #else
549         if (fsync(tdb->fd) != 0) {
550 #endif
551                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
552                                         "tdb1_transaction: fsync failed");
553                 return -1;
554         }
555 #if HAVE_MMAP
556         if (tdb->map_ptr) {
557                 tdb1_off_t moffset = offset & ~(tdb->page_size-1);
558                 if (msync(moffset + (char *)tdb->map_ptr,
559                           length + (offset - moffset), MS_SYNC) != 0) {
560                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
561                                                 "tdb1_transaction:"
562                                                 " msync failed - %s",
563                                                 strerror(errno));
564                         return -1;
565                 }
566         }
567 #endif
568         return 0;
569 }
570
571
572 static int _tdb1_transaction_cancel(struct tdb1_context *tdb)
573 {
574         int i, ret = 0;
575
576         if (tdb->transaction == NULL) {
577                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
578                                         "tdb1_transaction_cancel:"
579                                         " no transaction");
580                 return -1;
581         }
582
583         if (tdb->transaction->nesting != 0) {
584                 tdb->transaction->transaction_error = 1;
585                 tdb->transaction->nesting--;
586                 return 0;
587         }
588
589         tdb->map_size = tdb->transaction->old_map_size;
590
591         /* free all the transaction blocks */
592         for (i=0;i<tdb->transaction->num_blocks;i++) {
593                 if (tdb->transaction->blocks[i] != NULL) {
594                         free(tdb->transaction->blocks[i]);
595                 }
596         }
597         SAFE_FREE(tdb->transaction->blocks);
598
599         if (tdb->transaction->magic_offset) {
600                 const struct tdb1_methods *methods = tdb->transaction->io_methods;
601                 const uint32_t invalid = TDB1_RECOVERY_INVALID_MAGIC;
602
603                 /* remove the recovery marker */
604                 if (methods->tdb1_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
605                 transaction1_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
606                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
607                                    "tdb1_transaction_cancel: failed to"
608                                    " remove recovery magic");
609                         ret = -1;
610                 }
611         }
612
613         /* This also removes the OPEN_LOCK, if we have it. */
614         tdb1_release_transaction_locks(tdb);
615
616         /* restore the normal io methods */
617         tdb->methods = tdb->transaction->io_methods;
618
619         SAFE_FREE(tdb->transaction->hash_heads);
620         SAFE_FREE(tdb->transaction);
621
622         return ret;
623 }
624
625 /*
626   cancel the current transaction
627 */
628 int tdb1_transaction_cancel(struct tdb1_context *tdb)
629 {
630         return _tdb1_transaction_cancel(tdb);
631 }
632
633 /*
634   work out how much space the linearised recovery data will consume
635 */
636 static tdb1_len_t tdb1_recovery_size(struct tdb1_context *tdb)
637 {
638         tdb1_len_t recovery_size = 0;
639         int i;
640
641         recovery_size = sizeof(uint32_t);
642         for (i=0;i<tdb->transaction->num_blocks;i++) {
643                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
644                         break;
645                 }
646                 if (tdb->transaction->blocks[i] == NULL) {
647                         continue;
648                 }
649                 recovery_size += 2*sizeof(tdb1_off_t);
650                 if (i == tdb->transaction->num_blocks-1) {
651                         recovery_size += tdb->transaction->last_block_size;
652                 } else {
653                         recovery_size += tdb->transaction->block_size;
654                 }
655         }
656
657         return recovery_size;
658 }
659
660 int tdb1_recovery_area(struct tdb1_context *tdb,
661                       const struct tdb1_methods *methods,
662                       tdb1_off_t *recovery_offset,
663                       struct tdb1_record *rec)
664 {
665         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, recovery_offset) == -1) {
666                 return -1;
667         }
668
669         if (*recovery_offset == 0) {
670                 rec->rec_len = 0;
671                 return 0;
672         }
673
674         if (methods->tdb1_read(tdb, *recovery_offset, rec, sizeof(*rec),
675                               TDB1_DOCONV()) == -1) {
676                 return -1;
677         }
678
679         /* ignore invalid recovery regions: can happen in crash */
680         if (rec->magic != TDB1_RECOVERY_MAGIC &&
681             rec->magic != TDB1_RECOVERY_INVALID_MAGIC) {
682                 *recovery_offset = 0;
683                 rec->rec_len = 0;
684         }
685         return 0;
686 }
687
688 /*
689   allocate the recovery area, or use an existing recovery area if it is
690   large enough
691 */
692 static int tdb1_recovery_allocate(struct tdb1_context *tdb,
693                                  tdb1_len_t *recovery_size,
694                                  tdb1_off_t *recovery_offset,
695                                  tdb1_len_t *recovery_max_size)
696 {
697         struct tdb1_record rec;
698         const struct tdb1_methods *methods = tdb->transaction->io_methods;
699         tdb1_off_t recovery_head;
700
701         if (tdb1_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
702                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
703                            "tdb1_recovery_allocate:"
704                            " failed to read recovery head");
705                 return -1;
706         }
707
708         *recovery_size = tdb1_recovery_size(tdb);
709
710         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
711                 /* it fits in the existing area */
712                 *recovery_max_size = rec.rec_len;
713                 *recovery_offset = recovery_head;
714                 return 0;
715         }
716
717         /* we need to free up the old recovery area, then allocate a
718            new one at the end of the file. Note that we cannot use
719            tdb1_allocate() to allocate the new one as that might return
720            us an area that is being currently used (as of the start of
721            the transaction) */
722         if (recovery_head != 0) {
723                 if (tdb1_free(tdb, recovery_head, &rec) == -1) {
724                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
725                                    "tdb1_recovery_allocate: failed to free"
726                                    " previous recovery area");
727                         return -1;
728                 }
729         }
730
731         /* the tdb1_free() call might have increased the recovery size */
732         *recovery_size = tdb1_recovery_size(tdb);
733
734         /* round up to a multiple of page size */
735         *recovery_max_size = TDB1_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
736         *recovery_offset = tdb->map_size;
737         recovery_head = *recovery_offset;
738
739         if (methods->tdb1_expand_file(tdb, tdb->transaction->old_map_size,
740                                      (tdb->map_size - tdb->transaction->old_map_size) +
741                                      sizeof(rec) + *recovery_max_size) == -1) {
742                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
743                            "tdb1_recovery_allocate:"
744                            " failed to create recovery area");
745                 return -1;
746         }
747
748         /* remap the file (if using mmap) */
749         methods->tdb1_oob(tdb, tdb->map_size + 1, 1);
750
751         /* we have to reset the old map size so that we don't try to expand the file
752            again in the transaction commit, which would destroy the recovery area */
753         tdb->transaction->old_map_size = tdb->map_size;
754
755         /* write the recovery header offset and sync - we can sync without a race here
756            as the magic ptr in the recovery record has not been set */
757         TDB1_CONV(recovery_head);
758         if (methods->tdb1_write(tdb, TDB1_RECOVERY_HEAD,
759                                &recovery_head, sizeof(tdb1_off_t)) == -1) {
760                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
761                            "tdb1_recovery_allocate:"
762                            " failed to write recovery head");
763                 return -1;
764         }
765         if (transaction1_write_existing(tdb, TDB1_RECOVERY_HEAD, &recovery_head, sizeof(tdb1_off_t)) == -1) {
766                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
767                            "tdb1_recovery_allocate:"
768                            " failed to write recovery head");
769                 return -1;
770         }
771
772         return 0;
773 }
774
775
776 /*
777   setup the recovery data that will be used on a crash during commit
778 */
779 static int transaction1_setup_recovery(struct tdb1_context *tdb,
780                                        tdb1_off_t *magic_offset)
781 {
782         tdb1_len_t recovery_size;
783         unsigned char *data, *p;
784         const struct tdb1_methods *methods = tdb->transaction->io_methods;
785         struct tdb1_record *rec;
786         tdb1_off_t recovery_offset, recovery_max_size;
787         tdb1_off_t old_map_size = tdb->transaction->old_map_size;
788         uint32_t magic, tailer;
789         int i;
790
791         /*
792           check that the recovery area has enough space
793         */
794         if (tdb1_recovery_allocate(tdb, &recovery_size,
795                                   &recovery_offset, &recovery_max_size) == -1) {
796                 return -1;
797         }
798
799         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
800         if (data == NULL) {
801                 tdb->last_error = TDB_ERR_OOM;
802                 return -1;
803         }
804
805         rec = (struct tdb1_record *)data;
806         memset(rec, 0, sizeof(*rec));
807
808         rec->magic    = TDB1_RECOVERY_INVALID_MAGIC;
809         rec->data_len = recovery_size;
810         rec->rec_len  = recovery_max_size;
811         rec->key_len  = old_map_size;
812         TDB1_CONV(*rec);
813
814         /* build the recovery data into a single blob to allow us to do a single
815            large write, which should be more efficient */
816         p = data + sizeof(*rec);
817         for (i=0;i<tdb->transaction->num_blocks;i++) {
818                 tdb1_off_t offset;
819                 tdb1_len_t length;
820
821                 if (tdb->transaction->blocks[i] == NULL) {
822                         continue;
823                 }
824
825                 offset = i * tdb->transaction->block_size;
826                 length = tdb->transaction->block_size;
827                 if (i == tdb->transaction->num_blocks-1) {
828                         length = tdb->transaction->last_block_size;
829                 }
830
831                 if (offset >= old_map_size) {
832                         continue;
833                 }
834                 if (offset + length > tdb->transaction->old_map_size) {
835                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT,
836                                                 TDB_LOG_ERROR,
837                                                 "tdb1_transaction_setup_recovery: transaction data over new region boundary");
838                         free(data);
839                         return -1;
840                 }
841                 memcpy(p, &offset, 4);
842                 memcpy(p+4, &length, 4);
843                 if (TDB1_DOCONV()) {
844                         tdb1_convert(p, 8);
845                 }
846                 /* the recovery area contains the old data, not the
847                    new data, so we have to call the original tdb1_read
848                    method to get it */
849                 if (methods->tdb1_read(tdb, offset, p + 8, length, 0) != 0) {
850                         free(data);
851                         tdb->last_error = TDB_ERR_IO;
852                         return -1;
853                 }
854                 p += 8 + length;
855         }
856
857         /* and the tailer */
858         tailer = sizeof(*rec) + recovery_max_size;
859         memcpy(p, &tailer, 4);
860         if (TDB1_DOCONV()) {
861                 tdb1_convert(p, 4);
862         }
863
864         /* write the recovery data to the recovery area */
865         if (methods->tdb1_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
866                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
867                            "tdb1_transaction_setup_recovery:"
868                            " failed to write recovery data");
869                 free(data);
870                 return -1;
871         }
872         if (transaction1_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
873                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
874                            "tdb1_transaction_setup_recovery: failed to write"
875                            " secondary recovery data");
876                 free(data);
877                 return -1;
878         }
879
880         /* as we don't have ordered writes, we have to sync the recovery
881            data before we update the magic to indicate that the recovery
882            data is present */
883         if (transaction1_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
884                 free(data);
885                 return -1;
886         }
887
888         free(data);
889
890         magic = TDB1_RECOVERY_MAGIC;
891         TDB1_CONV(magic);
892
893         *magic_offset = recovery_offset + offsetof(struct tdb1_record, magic);
894
895         if (methods->tdb1_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
896                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
897                            "tdb1_transaction_setup_recovery:"
898                            " failed to write recovery magic");
899                 return -1;
900         }
901         if (transaction1_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
902                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
903                            "tdb1_transaction_setup_recovery:"
904                            " failed to write secondary recovery magic");
905                 return -1;
906         }
907
908         /* ensure the recovery magic marker is on disk */
909         if (transaction1_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
910                 return -1;
911         }
912
913         return 0;
914 }
915
916 static int _tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
917 {
918         const struct tdb1_methods *methods;
919
920         if (tdb->transaction == NULL) {
921                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
922                                         "tdb1_transaction_prepare_commit:"
923                                         " no transaction");
924                 return -1;
925         }
926
927         if (tdb->transaction->prepared) {
928                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
929                                         "tdb1_transaction_prepare_commit:"
930                                         " transaction already prepared");
931                 _tdb1_transaction_cancel(tdb);
932                 return -1;
933         }
934
935         if (tdb->transaction->transaction_error) {
936                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
937                                         "tdb1_transaction_prepare_commit:"
938                                         " transaction error pending");
939                 _tdb1_transaction_cancel(tdb);
940                 return -1;
941         }
942
943
944         if (tdb->transaction->nesting != 0) {
945                 return 0;
946         }
947
948         /* check for a null transaction */
949         if (tdb->transaction->blocks == NULL) {
950                 return 0;
951         }
952
953         methods = tdb->transaction->io_methods;
954
955         /* if there are any locks pending then the caller has not
956            nested their locks properly, so fail the transaction */
957         if (tdb1_have_extra_locks(tdb)) {
958                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
959                                         "tdb1_transaction_prepare_commit:"
960                                         " locks pending on commit");
961                 _tdb1_transaction_cancel(tdb);
962                 return -1;
963         }
964
965         /* upgrade the main transaction lock region to a write lock */
966         if (tdb1_allrecord_upgrade(tdb) == -1) {
967                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
968                            "tdb1_transaction_prepare_commit:"
969                            " failed to upgrade hash locks");
970                 _tdb1_transaction_cancel(tdb);
971                 return -1;
972         }
973
974         /* get the open lock - this prevents new users attaching to the database
975            during the commit */
976         if (tdb1_nest_lock(tdb, TDB1_OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
977                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
978                            "tdb1_transaction_prepare_commit:"
979                            " failed to get open lock");
980                 _tdb1_transaction_cancel(tdb);
981                 return -1;
982         }
983
984         if (!(tdb->flags & TDB1_NOSYNC)) {
985                 /* write the recovery data to the end of the file */
986                 if (transaction1_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
987                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
988                                    "tdb1_transaction_prepare_commit:"
989                                    " failed to setup recovery data");
990                         _tdb1_transaction_cancel(tdb);
991                         return -1;
992                 }
993         }
994
995         tdb->transaction->prepared = true;
996
997         /* expand the file to the new size if needed */
998         if (tdb->map_size != tdb->transaction->old_map_size) {
999                 if (methods->tdb1_expand_file(tdb, tdb->transaction->old_map_size,
1000                                              tdb->map_size -
1001                                              tdb->transaction->old_map_size) == -1) {
1002                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1003                                    "tdb1_transaction_prepare_commit:"
1004                                    " expansion failed");
1005                         _tdb1_transaction_cancel(tdb);
1006                         return -1;
1007                 }
1008                 tdb->map_size = tdb->transaction->old_map_size;
1009                 methods->tdb1_oob(tdb, tdb->map_size + 1, 1);
1010         }
1011
1012         /* Keep the open lock until the actual commit */
1013
1014         return 0;
1015 }
1016
1017 /*
1018    prepare to commit the current transaction
1019 */
1020 int tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
1021 {
1022         return _tdb1_transaction_prepare_commit(tdb);
1023 }
1024
1025 /* A repack is worthwhile if the largest is less than half total free. */
1026 static bool repack_worthwhile(struct tdb1_context *tdb)
1027 {
1028         tdb1_off_t ptr;
1029         struct tdb1_record rec;
1030         tdb1_len_t total = 0, largest = 0;
1031
1032         if (tdb1_ofs_read(tdb, TDB1_FREELIST_TOP, &ptr) == -1) {
1033                 return false;
1034         }
1035
1036         while (ptr != 0 && tdb1_rec_free_read(tdb, ptr, &rec) == 0) {
1037                 total += rec.rec_len;
1038                 if (rec.rec_len > largest) {
1039                         largest = rec.rec_len;
1040                 }
1041                 ptr = rec.next;
1042         }
1043
1044         return total > largest * 2;
1045 }
1046
1047 /*
1048   commit the current transaction
1049 */
1050 int tdb1_transaction_commit(struct tdb1_context *tdb)
1051 {
1052         const struct tdb1_methods *methods;
1053         int i;
1054         bool need_repack = false;
1055
1056         if (tdb->transaction == NULL) {
1057                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
1058                                         "tdb1_transaction_commit:"
1059                                         " no transaction");
1060                 return -1;
1061         }
1062
1063         if (tdb->transaction->transaction_error) {
1064                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
1065                                         "tdb1_transaction_commit:"
1066                                         " transaction error pending");
1067                 _tdb1_transaction_cancel(tdb);
1068                 return -1;
1069         }
1070
1071
1072         if (tdb->transaction->nesting != 0) {
1073                 tdb->transaction->nesting--;
1074                 return 0;
1075         }
1076
1077         /* check for a null transaction */
1078         if (tdb->transaction->blocks == NULL) {
1079                 _tdb1_transaction_cancel(tdb);
1080                 return 0;
1081         }
1082
1083         if (!tdb->transaction->prepared) {
1084                 int ret = _tdb1_transaction_prepare_commit(tdb);
1085                 if (ret)
1086                         return ret;
1087         }
1088
1089         methods = tdb->transaction->io_methods;
1090
1091         /* perform all the writes */
1092         for (i=0;i<tdb->transaction->num_blocks;i++) {
1093                 tdb1_off_t offset;
1094                 tdb1_len_t length;
1095
1096                 if (tdb->transaction->blocks[i] == NULL) {
1097                         continue;
1098                 }
1099
1100                 offset = i * tdb->transaction->block_size;
1101                 length = tdb->transaction->block_size;
1102                 if (i == tdb->transaction->num_blocks-1) {
1103                         length = tdb->transaction->last_block_size;
1104                 }
1105
1106                 if (methods->tdb1_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1107                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1108                                    "tdb1_transaction_commit:"
1109                                    " write failed during commit");
1110
1111                         /* we've overwritten part of the data and
1112                            possibly expanded the file, so we need to
1113                            run the crash recovery code */
1114                         tdb->methods = methods;
1115                         tdb1_transaction_recover(tdb);
1116
1117                         _tdb1_transaction_cancel(tdb);
1118
1119                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1120                                    "tdb1_transaction_commit: write failed");
1121                         return -1;
1122                 }
1123                 SAFE_FREE(tdb->transaction->blocks[i]);
1124         }
1125
1126         /* Do this before we drop lock or blocks. */
1127         if (tdb->transaction->expanded) {
1128                 need_repack = repack_worthwhile(tdb);
1129         }
1130
1131         SAFE_FREE(tdb->transaction->blocks);
1132         tdb->transaction->num_blocks = 0;
1133
1134         /* ensure the new data is on disk */
1135         if (transaction1_sync(tdb, 0, tdb->map_size) == -1) {
1136                 return -1;
1137         }
1138
1139         /*
1140           TODO: maybe write to some dummy hdr field, or write to magic
1141           offset without mmap, before the last sync, instead of the
1142           utime() call
1143         */
1144
1145         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1146            don't change the mtime of the file, this means the file may
1147            not be backed up (as tdb rounding to block sizes means that
1148            file size changes are quite rare too). The following forces
1149            mtime changes when a transaction completes */
1150 #if HAVE_UTIME
1151         utime(tdb->name, NULL);
1152 #endif
1153
1154         /* use a transaction cancel to free memory and remove the
1155            transaction locks */
1156         _tdb1_transaction_cancel(tdb);
1157
1158         if (need_repack) {
1159                 return tdb1_repack(tdb);
1160         }
1161
1162         return 0;
1163 }
1164
1165
1166 /*
1167   recover from an aborted transaction. Must be called with exclusive
1168   database write access already established (including the open
1169   lock to prevent new processes attaching)
1170 */
1171 int tdb1_transaction_recover(struct tdb1_context *tdb)
1172 {
1173         tdb1_off_t recovery_head, recovery_eof;
1174         unsigned char *data, *p;
1175         uint32_t zero = 0;
1176         struct tdb1_record rec;
1177
1178         /* find the recovery area */
1179         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1180                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1181                            "tdb1_transaction_recover:"
1182                            " failed to read recovery head");
1183                 return -1;
1184         }
1185
1186         if (recovery_head == 0) {
1187                 /* we have never allocated a recovery record */
1188                 return 0;
1189         }
1190
1191         /* read the recovery record */
1192         if (tdb->methods->tdb1_read(tdb, recovery_head, &rec,
1193                                    sizeof(rec), TDB1_DOCONV()) == -1) {
1194                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1195                            "tdb1_transaction_recover:"
1196                            " failed to read recovery record");
1197                 return -1;
1198         }
1199
1200         if (rec.magic != TDB1_RECOVERY_MAGIC) {
1201                 /* there is no valid recovery data */
1202                 return 0;
1203         }
1204
1205         if (tdb->read_only) {
1206                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1207                                         "tdb1_transaction_recover:"
1208                                         " attempt to recover read only"
1209                                         " database");
1210                 return -1;
1211         }
1212
1213         recovery_eof = rec.key_len;
1214
1215         data = (unsigned char *)malloc(rec.data_len);
1216         if (data == NULL) {
1217                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1218                                         "tdb1_transaction_recover:"
1219                                         " failed to allocate recovery data");
1220                 return -1;
1221         }
1222
1223         /* read the full recovery data */
1224         if (tdb->methods->tdb1_read(tdb, recovery_head + sizeof(rec), data,
1225                                    rec.data_len, 0) == -1) {
1226                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1227                            "tdb1_transaction_recover:"
1228                            " failed to read recovery data");
1229                 return -1;
1230         }
1231
1232         /* recover the file data */
1233         p = data;
1234         while (p+8 < data + rec.data_len) {
1235                 uint32_t ofs, len;
1236                 if (TDB1_DOCONV()) {
1237                         tdb1_convert(p, 8);
1238                 }
1239                 memcpy(&ofs, p, 4);
1240                 memcpy(&len, p+4, 4);
1241
1242                 if (tdb->methods->tdb1_write(tdb, ofs, p+8, len) == -1) {
1243                         free(data);
1244                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1245                                    "tdb1_transaction_recover: failed to recover"
1246                                    " %d bytes at offset %d", len, ofs);
1247                         return -1;
1248                 }
1249                 p += 8 + len;
1250         }
1251
1252         free(data);
1253
1254         if (transaction1_sync(tdb, 0, tdb->map_size) == -1) {
1255                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1256                            "tdb1_transaction_recover: failed to sync recovery");
1257                 return -1;
1258         }
1259
1260         /* if the recovery area is after the recovered eof then remove it */
1261         if (recovery_eof <= recovery_head) {
1262                 if (tdb1_ofs_write(tdb, TDB1_RECOVERY_HEAD, &zero) == -1) {
1263                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1264                                    "tdb1_transaction_recover: failed to remove"
1265                                    " recovery head");
1266                         return -1;
1267                 }
1268         }
1269
1270         /* remove the recovery magic */
1271         if (tdb1_ofs_write(tdb, recovery_head + offsetof(struct tdb1_record, magic),
1272                           &zero) == -1) {
1273                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1274                            "tdb1_transaction_recover: failed to remove"
1275                            " recovery magic");
1276                 return -1;
1277         }
1278
1279         if (transaction1_sync(tdb, 0, recovery_eof) == -1) {
1280                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1281                            "tdb1_transaction_recover:"
1282                            " failed to sync2 recovery");
1283                 return -1;
1284         }
1285
1286         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1287                    "tdb1_transaction_recover: recovered %d byte database",
1288                    recovery_eof);
1289
1290         /* all done */
1291         return 0;
1292 }
1293
1294 /* Any I/O failures we say "needs recovery". */
1295 bool tdb1_needs_recovery(struct tdb1_context *tdb)
1296 {
1297         tdb1_off_t recovery_head;
1298         struct tdb1_record rec;
1299
1300         /* find the recovery area */
1301         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1302                 return true;
1303         }
1304
1305         if (recovery_head == 0) {
1306                 /* we have never allocated a recovery record */
1307                 return false;
1308         }
1309
1310         /* read the recovery record */
1311         if (tdb->methods->tdb1_read(tdb, recovery_head, &rec,
1312                                    sizeof(rec), TDB1_DOCONV()) == -1) {
1313                 return true;
1314         }
1315
1316         return (rec.magic == TDB1_RECOVERY_MAGIC);
1317 }