]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/tdb1_transaction.c
tdb2: Make tdb1 use the tdb_file structure.
[ccan] / ccan / tdb2 / tdb1_transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb1_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb1_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb1_write() calls. The hooked
48     transaction versions of tdb1_read() and tdb1_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb1_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb1_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb1_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
89     tdb1_add_flags() transaction nesting is enabled.
90     The default is that transaction nesting is NOT allowed.
91
92     Beware. when transactions are nested a transaction successfully
93     completed with tdb1_transaction_commit() can be silently unrolled later.
94 */
95
96
97 /*
98   hold the context of any current transaction
99 */
100 struct tdb1_transaction {
101         /* we keep a mirrored copy of the tdb hash heads here so
102            tdb1_next_hash_chain() can operate efficiently */
103         uint32_t *hash_heads;
104
105         /* the original io methods - used to do IOs to the real db */
106         const struct tdb1_methods *io_methods;
107
108         /* the list of transaction blocks. When a block is first
109            written to, it gets created in this list */
110         uint8_t **blocks;
111         uint32_t num_blocks;
112         uint32_t block_size;      /* bytes in each block */
113         uint32_t last_block_size; /* number of valid bytes in the last block */
114
115         /* non-zero when an internal transaction error has
116            occurred. All write operations will then fail until the
117            transaction is ended */
118         int transaction_error;
119
120         /* when inside a transaction we need to keep track of any
121            nested tdb1_transaction_start() calls, as these are allowed,
122            but don't create a new transaction */
123         int nesting;
124
125         /* set when a prepare has already occurred */
126         bool prepared;
127         tdb1_off_t magic_offset;
128
129         /* old file size before transaction */
130         tdb1_len_t old_map_size;
131
132         /* did we expand in this transaction */
133         bool expanded;
134 };
135
136
137 /*
138   read while in a transaction. We need to check first if the data is in our list
139   of transaction elements, then if not do a real read
140 */
141 static int transaction1_read(struct tdb1_context *tdb, tdb1_off_t off, void *buf,
142                              tdb1_len_t len, int cv)
143 {
144         uint32_t blk;
145
146         /* break it down into block sized ops */
147         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
148                 tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
149                 if (transaction1_read(tdb, off, buf, len2, cv) != 0) {
150                         return -1;
151                 }
152                 len -= len2;
153                 off += len2;
154                 buf = (void *)(len2 + (char *)buf);
155         }
156
157         if (len == 0) {
158                 return 0;
159         }
160
161         blk = off / tdb->transaction->block_size;
162
163         /* see if we have it in the block list */
164         if (tdb->transaction->num_blocks <= blk ||
165             tdb->transaction->blocks[blk] == NULL) {
166                 /* nope, do a real read */
167                 if (tdb->transaction->io_methods->tdb1_read(tdb, off, buf, len, cv) != 0) {
168                         goto fail;
169                 }
170                 return 0;
171         }
172
173         /* it is in the block list. Now check for the last block */
174         if (blk == tdb->transaction->num_blocks-1) {
175                 if (len > tdb->transaction->last_block_size) {
176                         goto fail;
177                 }
178         }
179
180         /* now copy it out of this block */
181         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
182         if (cv) {
183                 tdb1_convert(buf, len);
184         }
185         return 0;
186
187 fail:
188         tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
189                                 "transaction_read: failed at off=%d len=%d",
190                                 off, len);
191         tdb->transaction->transaction_error = 1;
192         return -1;
193 }
194
195
196 /*
197   write while in a transaction
198 */
199 static int transaction1_write(struct tdb1_context *tdb, tdb1_off_t off,
200                              const void *buf, tdb1_len_t len)
201 {
202         uint32_t blk;
203
204         /* Only a commit is allowed on a prepared transaction */
205         if (tdb->transaction->prepared) {
206                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
207                                         "transaction_write: transaction already"
208                                         " prepared, write not allowed");
209                 tdb->transaction->transaction_error = 1;
210                 return -1;
211         }
212
213         /* if the write is to a hash head, then update the transaction
214            hash heads */
215         if (len == sizeof(tdb1_off_t) && off >= TDB1_FREELIST_TOP &&
216             off < TDB1_FREELIST_TOP+TDB1_HASHTABLE_SIZE(tdb)) {
217                 uint32_t chain = (off-TDB1_FREELIST_TOP) / sizeof(tdb1_off_t);
218                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
219         }
220
221         /* break it up into block sized chunks */
222         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
223                 tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
224                 if (transaction1_write(tdb, off, buf, len2) != 0) {
225                         return -1;
226                 }
227                 len -= len2;
228                 off += len2;
229                 if (buf != NULL) {
230                         buf = (const void *)(len2 + (const char *)buf);
231                 }
232         }
233
234         if (len == 0) {
235                 return 0;
236         }
237
238         blk = off / tdb->transaction->block_size;
239         off = off % tdb->transaction->block_size;
240
241         if (tdb->transaction->num_blocks <= blk) {
242                 uint8_t **new_blocks;
243                 /* expand the blocks array */
244                 if (tdb->transaction->blocks == NULL) {
245                         new_blocks = (uint8_t **)malloc(
246                                 (blk+1)*sizeof(uint8_t *));
247                 } else {
248                         new_blocks = (uint8_t **)realloc(
249                                 tdb->transaction->blocks,
250                                 (blk+1)*sizeof(uint8_t *));
251                 }
252                 if (new_blocks == NULL) {
253                         tdb->last_error = TDB_ERR_OOM;
254                         goto fail;
255                 }
256                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
257                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
258                 tdb->transaction->blocks = new_blocks;
259                 tdb->transaction->num_blocks = blk+1;
260                 tdb->transaction->last_block_size = 0;
261         }
262
263         /* allocate and fill a block? */
264         if (tdb->transaction->blocks[blk] == NULL) {
265                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
266                 if (tdb->transaction->blocks[blk] == NULL) {
267                         tdb->last_error = TDB_ERR_OOM;
268                         tdb->transaction->transaction_error = 1;
269                         return -1;
270                 }
271                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
272                         tdb1_len_t len2 = tdb->transaction->block_size;
273                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
274                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
275                         }
276                         if (tdb->transaction->io_methods->tdb1_read(tdb, blk * tdb->transaction->block_size,
277                                                                    tdb->transaction->blocks[blk],
278                                                                    len2, 0) != 0) {
279                                 SAFE_FREE(tdb->transaction->blocks[blk]);
280                                 tdb->last_error = TDB_ERR_IO;
281                                 goto fail;
282                         }
283                         if (blk == tdb->transaction->num_blocks-1) {
284                                 tdb->transaction->last_block_size = len2;
285                         }
286                 }
287         }
288
289         /* overwrite part of an existing block */
290         if (buf == NULL) {
291                 memset(tdb->transaction->blocks[blk] + off, 0, len);
292         } else {
293                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
294         }
295         if (blk == tdb->transaction->num_blocks-1) {
296                 if (len + off > tdb->transaction->last_block_size) {
297                         tdb->transaction->last_block_size = len + off;
298                 }
299         }
300
301         return 0;
302
303 fail:
304         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
305                    "transaction_write: failed at off=%d len=%d",
306                    (blk*tdb->transaction->block_size) + off, len);
307         tdb->transaction->transaction_error = 1;
308         return -1;
309 }
310
311
312 /*
313   write while in a transaction - this varient never expands the transaction blocks, it only
314   updates existing blocks. This means it cannot change the recovery size
315 */
316 static int transaction1_write_existing(struct tdb1_context *tdb, tdb1_off_t off,
317                                       const void *buf, tdb1_len_t len)
318 {
319         uint32_t blk;
320
321         /* break it up into block sized chunks */
322         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
323                 tdb1_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
324                 if (transaction1_write_existing(tdb, off, buf, len2) != 0) {
325                         return -1;
326                 }
327                 len -= len2;
328                 off += len2;
329                 if (buf != NULL) {
330                         buf = (const void *)(len2 + (const char *)buf);
331                 }
332         }
333
334         if (len == 0) {
335                 return 0;
336         }
337
338         blk = off / tdb->transaction->block_size;
339         off = off % tdb->transaction->block_size;
340
341         if (tdb->transaction->num_blocks <= blk ||
342             tdb->transaction->blocks[blk] == NULL) {
343                 return 0;
344         }
345
346         if (blk == tdb->transaction->num_blocks-1 &&
347             off + len > tdb->transaction->last_block_size) {
348                 if (off >= tdb->transaction->last_block_size) {
349                         return 0;
350                 }
351                 len = tdb->transaction->last_block_size - off;
352         }
353
354         /* overwrite part of an existing block */
355         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
356
357         return 0;
358 }
359
360
361 /*
362   accelerated hash chain head search, using the cached hash heads
363 */
364 static void transaction1_next_hash_chain(struct tdb1_context *tdb, uint32_t *chain)
365 {
366         uint32_t h = *chain;
367         for (;h < tdb->header.hash_size;h++) {
368                 /* the +1 takes account of the freelist */
369                 if (0 != tdb->transaction->hash_heads[h+1]) {
370                         break;
371                 }
372         }
373         (*chain) = h;
374 }
375
376 /*
377   out of bounds check during a transaction
378 */
379 static int transaction1_oob(struct tdb1_context *tdb, tdb1_off_t len, int probe)
380 {
381         if (len <= tdb->file->map_size) {
382                 return 0;
383         }
384         tdb->last_error = TDB_ERR_IO;
385         return -1;
386 }
387
388 /*
389   transaction version of tdb1_expand().
390 */
391 static int transaction1_expand_file(struct tdb1_context *tdb, tdb1_off_t size,
392                                     tdb1_off_t addition)
393 {
394         /* add a write to the transaction elements, so subsequent
395            reads see the zero data */
396         if (transaction1_write(tdb, size, NULL, addition) != 0) {
397                 return -1;
398         }
399
400         tdb->transaction->expanded = true;
401
402         return 0;
403 }
404
405 static const struct tdb1_methods transaction1_methods = {
406         transaction1_read,
407         transaction1_write,
408         transaction1_next_hash_chain,
409         transaction1_oob,
410         transaction1_expand_file,
411 };
412
413
414 /*
415   start a tdb transaction. No token is returned, as only a single
416   transaction is allowed to be pending per tdb1_context
417 */
418 static int _tdb1_transaction_start(struct tdb1_context *tdb)
419 {
420         /* some sanity checks */
421         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
422                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
423                                         "tdb1_transaction_start: cannot start a"
424                                         " transaction on a read-only or"
425                                         " internal db");
426                 return -1;
427         }
428
429         /* cope with nested tdb1_transaction_start() calls */
430         if (tdb->transaction != NULL) {
431                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
432                         tdb->last_error = TDB_ERR_EINVAL;
433                         return -1;
434                 }
435                 tdb->transaction->nesting++;
436                 return 0;
437         }
438
439         if (tdb1_have_extra_locks(tdb)) {
440                 /* the caller must not have any locks when starting a
441                    transaction as otherwise we'll be screwed by lack
442                    of nested locks in posix */
443                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
444                                         "tdb1_transaction_start: cannot start a"
445                                         " transaction with locks held");
446                 return -1;
447         }
448
449         if (tdb->travlocks.next != NULL) {
450                 /* you cannot use transactions inside a traverse (although you can use
451                    traverse inside a transaction) as otherwise you can end up with
452                    deadlock */
453                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
454                                         "tdb1_transaction_start: cannot start a"
455                                         " transaction within a traverse");
456                 return -1;
457         }
458
459         tdb->transaction = (struct tdb1_transaction *)
460                 calloc(sizeof(struct tdb1_transaction), 1);
461         if (tdb->transaction == NULL) {
462                 tdb->last_error = TDB_ERR_OOM;
463                 return -1;
464         }
465
466         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
467         tdb->transaction->block_size = tdb->page_size;
468
469         /* get the transaction write lock. This is a blocking lock. As
470            discussed with Volker, there are a number of ways we could
471            make this async, which we will probably do in the future */
472         if (tdb1_transaction_lock(tdb, F_WRLCK, TDB_LOCK_WAIT) == -1) {
473                 SAFE_FREE(tdb->transaction->blocks);
474                 SAFE_FREE(tdb->transaction);
475                 return -1;
476         }
477
478         /* get a read lock from the freelist to the end of file. This
479            is upgraded to a write lock during the commit */
480         if (tdb1_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
481                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
482                            "tdb1_transaction_start: failed to get hash locks");
483                 goto fail_allrecord_lock;
484         }
485
486         /* setup a copy of the hash table heads so the hash scan in
487            traverse can be fast */
488         tdb->transaction->hash_heads = (uint32_t *)
489                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
490         if (tdb->transaction->hash_heads == NULL) {
491                 tdb->last_error = TDB_ERR_OOM;
492                 goto fail;
493         }
494         if (tdb->methods->tdb1_read(tdb, TDB1_FREELIST_TOP, tdb->transaction->hash_heads,
495                                    TDB1_HASHTABLE_SIZE(tdb), 0) != 0) {
496                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
497                            "tdb1_transaction_start: failed to read hash heads");
498                 goto fail;
499         }
500
501         /* make sure we know about any file expansions already done by
502            anyone else */
503         tdb->methods->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
504         tdb->transaction->old_map_size = tdb->file->map_size;
505
506         /* finally hook the io methods, replacing them with
507            transaction specific methods */
508         tdb->transaction->io_methods = tdb->methods;
509         tdb->methods = &transaction1_methods;
510
511         return 0;
512
513 fail:
514         tdb1_allrecord_unlock(tdb, F_RDLCK);
515 fail_allrecord_lock:
516         tdb1_transaction_unlock(tdb, F_WRLCK);
517         SAFE_FREE(tdb->transaction->blocks);
518         SAFE_FREE(tdb->transaction->hash_heads);
519         SAFE_FREE(tdb->transaction);
520         return -1;
521 }
522
523 int tdb1_transaction_start(struct tdb1_context *tdb)
524 {
525         return _tdb1_transaction_start(tdb);
526 }
527
528 /*
529   sync to disk
530 */
531 static int transaction1_sync(struct tdb1_context *tdb, tdb1_off_t offset, tdb1_len_t length)
532 {
533         if (tdb->flags & TDB_NOSYNC) {
534                 return 0;
535         }
536
537 #if HAVE_FDATASYNC
538         if (fdatasync(tdb->file->fd) != 0) {
539 #else
540         if (fsync(tdb->file->fd) != 0) {
541 #endif
542                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
543                                         "tdb1_transaction: fsync failed");
544                 return -1;
545         }
546 #if HAVE_MMAP
547         if (tdb->file->map_ptr) {
548                 tdb1_off_t moffset = offset & ~(tdb->page_size-1);
549                 if (msync(moffset + (char *)tdb->file->map_ptr,
550                           length + (offset - moffset), MS_SYNC) != 0) {
551                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
552                                                 "tdb1_transaction:"
553                                                 " msync failed - %s",
554                                                 strerror(errno));
555                         return -1;
556                 }
557         }
558 #endif
559         return 0;
560 }
561
562
563 static int _tdb1_transaction_cancel(struct tdb1_context *tdb)
564 {
565         int i, ret = 0;
566
567         if (tdb->transaction == NULL) {
568                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
569                                         "tdb1_transaction_cancel:"
570                                         " no transaction");
571                 return -1;
572         }
573
574         if (tdb->transaction->nesting != 0) {
575                 tdb->transaction->transaction_error = 1;
576                 tdb->transaction->nesting--;
577                 return 0;
578         }
579
580         tdb->file->map_size = tdb->transaction->old_map_size;
581
582         /* free all the transaction blocks */
583         for (i=0;i<tdb->transaction->num_blocks;i++) {
584                 if (tdb->transaction->blocks[i] != NULL) {
585                         free(tdb->transaction->blocks[i]);
586                 }
587         }
588         SAFE_FREE(tdb->transaction->blocks);
589
590         if (tdb->transaction->magic_offset) {
591                 const struct tdb1_methods *methods = tdb->transaction->io_methods;
592                 const uint32_t invalid = TDB1_RECOVERY_INVALID_MAGIC;
593
594                 /* remove the recovery marker */
595                 if (methods->tdb1_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
596                 transaction1_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
597                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
598                                    "tdb1_transaction_cancel: failed to"
599                                    " remove recovery magic");
600                         ret = -1;
601                 }
602         }
603
604         /* This also removes the OPEN_LOCK, if we have it. */
605         tdb1_release_transaction_locks(tdb);
606
607         /* restore the normal io methods */
608         tdb->methods = tdb->transaction->io_methods;
609
610         SAFE_FREE(tdb->transaction->hash_heads);
611         SAFE_FREE(tdb->transaction);
612
613         return ret;
614 }
615
616 /*
617   cancel the current transaction
618 */
619 int tdb1_transaction_cancel(struct tdb1_context *tdb)
620 {
621         return _tdb1_transaction_cancel(tdb);
622 }
623
624 /*
625   work out how much space the linearised recovery data will consume
626 */
627 static tdb1_len_t tdb1_recovery_size(struct tdb1_context *tdb)
628 {
629         tdb1_len_t recovery_size = 0;
630         int i;
631
632         recovery_size = sizeof(uint32_t);
633         for (i=0;i<tdb->transaction->num_blocks;i++) {
634                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
635                         break;
636                 }
637                 if (tdb->transaction->blocks[i] == NULL) {
638                         continue;
639                 }
640                 recovery_size += 2*sizeof(tdb1_off_t);
641                 if (i == tdb->transaction->num_blocks-1) {
642                         recovery_size += tdb->transaction->last_block_size;
643                 } else {
644                         recovery_size += tdb->transaction->block_size;
645                 }
646         }
647
648         return recovery_size;
649 }
650
651 int tdb1_recovery_area(struct tdb1_context *tdb,
652                       const struct tdb1_methods *methods,
653                       tdb1_off_t *recovery_offset,
654                       struct tdb1_record *rec)
655 {
656         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, recovery_offset) == -1) {
657                 return -1;
658         }
659
660         if (*recovery_offset == 0) {
661                 rec->rec_len = 0;
662                 return 0;
663         }
664
665         if (methods->tdb1_read(tdb, *recovery_offset, rec, sizeof(*rec),
666                               TDB1_DOCONV()) == -1) {
667                 return -1;
668         }
669
670         /* ignore invalid recovery regions: can happen in crash */
671         if (rec->magic != TDB1_RECOVERY_MAGIC &&
672             rec->magic != TDB1_RECOVERY_INVALID_MAGIC) {
673                 *recovery_offset = 0;
674                 rec->rec_len = 0;
675         }
676         return 0;
677 }
678
679 /*
680   allocate the recovery area, or use an existing recovery area if it is
681   large enough
682 */
683 static int tdb1_recovery_allocate(struct tdb1_context *tdb,
684                                  tdb1_len_t *recovery_size,
685                                  tdb1_off_t *recovery_offset,
686                                  tdb1_len_t *recovery_max_size)
687 {
688         struct tdb1_record rec;
689         const struct tdb1_methods *methods = tdb->transaction->io_methods;
690         tdb1_off_t recovery_head;
691
692         if (tdb1_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
693                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
694                            "tdb1_recovery_allocate:"
695                            " failed to read recovery head");
696                 return -1;
697         }
698
699         *recovery_size = tdb1_recovery_size(tdb);
700
701         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
702                 /* it fits in the existing area */
703                 *recovery_max_size = rec.rec_len;
704                 *recovery_offset = recovery_head;
705                 return 0;
706         }
707
708         /* we need to free up the old recovery area, then allocate a
709            new one at the end of the file. Note that we cannot use
710            tdb1_allocate() to allocate the new one as that might return
711            us an area that is being currently used (as of the start of
712            the transaction) */
713         if (recovery_head != 0) {
714                 if (tdb1_free(tdb, recovery_head, &rec) == -1) {
715                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
716                                    "tdb1_recovery_allocate: failed to free"
717                                    " previous recovery area");
718                         return -1;
719                 }
720         }
721
722         /* the tdb1_free() call might have increased the recovery size */
723         *recovery_size = tdb1_recovery_size(tdb);
724
725         /* round up to a multiple of page size */
726         *recovery_max_size = TDB1_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
727         *recovery_offset = tdb->file->map_size;
728         recovery_head = *recovery_offset;
729
730         if (methods->tdb1_expand_file(tdb, tdb->transaction->old_map_size,
731                                      (tdb->file->map_size - tdb->transaction->old_map_size) +
732                                      sizeof(rec) + *recovery_max_size) == -1) {
733                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
734                            "tdb1_recovery_allocate:"
735                            " failed to create recovery area");
736                 return -1;
737         }
738
739         /* remap the file (if using mmap) */
740         methods->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
741
742         /* we have to reset the old map size so that we don't try to expand the file
743            again in the transaction commit, which would destroy the recovery area */
744         tdb->transaction->old_map_size = tdb->file->map_size;
745
746         /* write the recovery header offset and sync - we can sync without a race here
747            as the magic ptr in the recovery record has not been set */
748         TDB1_CONV(recovery_head);
749         if (methods->tdb1_write(tdb, TDB1_RECOVERY_HEAD,
750                                &recovery_head, sizeof(tdb1_off_t)) == -1) {
751                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
752                            "tdb1_recovery_allocate:"
753                            " failed to write recovery head");
754                 return -1;
755         }
756         if (transaction1_write_existing(tdb, TDB1_RECOVERY_HEAD, &recovery_head, sizeof(tdb1_off_t)) == -1) {
757                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
758                            "tdb1_recovery_allocate:"
759                            " failed to write recovery head");
760                 return -1;
761         }
762
763         return 0;
764 }
765
766
767 /*
768   setup the recovery data that will be used on a crash during commit
769 */
770 static int transaction1_setup_recovery(struct tdb1_context *tdb,
771                                        tdb1_off_t *magic_offset)
772 {
773         tdb1_len_t recovery_size;
774         unsigned char *data, *p;
775         const struct tdb1_methods *methods = tdb->transaction->io_methods;
776         struct tdb1_record *rec;
777         tdb1_off_t recovery_offset, recovery_max_size;
778         tdb1_off_t old_map_size = tdb->transaction->old_map_size;
779         uint32_t magic, tailer;
780         int i;
781
782         /*
783           check that the recovery area has enough space
784         */
785         if (tdb1_recovery_allocate(tdb, &recovery_size,
786                                   &recovery_offset, &recovery_max_size) == -1) {
787                 return -1;
788         }
789
790         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
791         if (data == NULL) {
792                 tdb->last_error = TDB_ERR_OOM;
793                 return -1;
794         }
795
796         rec = (struct tdb1_record *)data;
797         memset(rec, 0, sizeof(*rec));
798
799         rec->magic    = TDB1_RECOVERY_INVALID_MAGIC;
800         rec->data_len = recovery_size;
801         rec->rec_len  = recovery_max_size;
802         rec->key_len  = old_map_size;
803         TDB1_CONV(*rec);
804
805         /* build the recovery data into a single blob to allow us to do a single
806            large write, which should be more efficient */
807         p = data + sizeof(*rec);
808         for (i=0;i<tdb->transaction->num_blocks;i++) {
809                 tdb1_off_t offset;
810                 tdb1_len_t length;
811
812                 if (tdb->transaction->blocks[i] == NULL) {
813                         continue;
814                 }
815
816                 offset = i * tdb->transaction->block_size;
817                 length = tdb->transaction->block_size;
818                 if (i == tdb->transaction->num_blocks-1) {
819                         length = tdb->transaction->last_block_size;
820                 }
821
822                 if (offset >= old_map_size) {
823                         continue;
824                 }
825                 if (offset + length > tdb->transaction->old_map_size) {
826                         tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT,
827                                                 TDB_LOG_ERROR,
828                                                 "tdb1_transaction_setup_recovery: transaction data over new region boundary");
829                         free(data);
830                         return -1;
831                 }
832                 memcpy(p, &offset, 4);
833                 memcpy(p+4, &length, 4);
834                 if (TDB1_DOCONV()) {
835                         tdb1_convert(p, 8);
836                 }
837                 /* the recovery area contains the old data, not the
838                    new data, so we have to call the original tdb1_read
839                    method to get it */
840                 if (methods->tdb1_read(tdb, offset, p + 8, length, 0) != 0) {
841                         free(data);
842                         tdb->last_error = TDB_ERR_IO;
843                         return -1;
844                 }
845                 p += 8 + length;
846         }
847
848         /* and the tailer */
849         tailer = sizeof(*rec) + recovery_max_size;
850         memcpy(p, &tailer, 4);
851         if (TDB1_DOCONV()) {
852                 tdb1_convert(p, 4);
853         }
854
855         /* write the recovery data to the recovery area */
856         if (methods->tdb1_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
857                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
858                            "tdb1_transaction_setup_recovery:"
859                            " failed to write recovery data");
860                 free(data);
861                 return -1;
862         }
863         if (transaction1_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
864                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
865                            "tdb1_transaction_setup_recovery: failed to write"
866                            " secondary recovery data");
867                 free(data);
868                 return -1;
869         }
870
871         /* as we don't have ordered writes, we have to sync the recovery
872            data before we update the magic to indicate that the recovery
873            data is present */
874         if (transaction1_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
875                 free(data);
876                 return -1;
877         }
878
879         free(data);
880
881         magic = TDB1_RECOVERY_MAGIC;
882         TDB1_CONV(magic);
883
884         *magic_offset = recovery_offset + offsetof(struct tdb1_record, magic);
885
886         if (methods->tdb1_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
887                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
888                            "tdb1_transaction_setup_recovery:"
889                            " failed to write recovery magic");
890                 return -1;
891         }
892         if (transaction1_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
893                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
894                            "tdb1_transaction_setup_recovery:"
895                            " failed to write secondary recovery magic");
896                 return -1;
897         }
898
899         /* ensure the recovery magic marker is on disk */
900         if (transaction1_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
901                 return -1;
902         }
903
904         return 0;
905 }
906
907 static int _tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
908 {
909         const struct tdb1_methods *methods;
910
911         if (tdb->transaction == NULL) {
912                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
913                                         "tdb1_transaction_prepare_commit:"
914                                         " no transaction");
915                 return -1;
916         }
917
918         if (tdb->transaction->prepared) {
919                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
920                                         "tdb1_transaction_prepare_commit:"
921                                         " transaction already prepared");
922                 _tdb1_transaction_cancel(tdb);
923                 return -1;
924         }
925
926         if (tdb->transaction->transaction_error) {
927                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
928                                         "tdb1_transaction_prepare_commit:"
929                                         " transaction error pending");
930                 _tdb1_transaction_cancel(tdb);
931                 return -1;
932         }
933
934
935         if (tdb->transaction->nesting != 0) {
936                 return 0;
937         }
938
939         /* check for a null transaction */
940         if (tdb->transaction->blocks == NULL) {
941                 return 0;
942         }
943
944         methods = tdb->transaction->io_methods;
945
946         /* if there are any locks pending then the caller has not
947            nested their locks properly, so fail the transaction */
948         if (tdb1_have_extra_locks(tdb)) {
949                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
950                                         "tdb1_transaction_prepare_commit:"
951                                         " locks pending on commit");
952                 _tdb1_transaction_cancel(tdb);
953                 return -1;
954         }
955
956         /* upgrade the main transaction lock region to a write lock */
957         if (tdb1_allrecord_upgrade(tdb) == -1) {
958                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
959                            "tdb1_transaction_prepare_commit:"
960                            " failed to upgrade hash locks");
961                 _tdb1_transaction_cancel(tdb);
962                 return -1;
963         }
964
965         /* get the open lock - this prevents new users attaching to the database
966            during the commit */
967         if (tdb1_nest_lock(tdb, TDB1_OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
968                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
969                            "tdb1_transaction_prepare_commit:"
970                            " failed to get open lock");
971                 _tdb1_transaction_cancel(tdb);
972                 return -1;
973         }
974
975         if (!(tdb->flags & TDB_NOSYNC)) {
976                 /* write the recovery data to the end of the file */
977                 if (transaction1_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
978                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
979                                    "tdb1_transaction_prepare_commit:"
980                                    " failed to setup recovery data");
981                         _tdb1_transaction_cancel(tdb);
982                         return -1;
983                 }
984         }
985
986         tdb->transaction->prepared = true;
987
988         /* expand the file to the new size if needed */
989         if (tdb->file->map_size != tdb->transaction->old_map_size) {
990                 if (methods->tdb1_expand_file(tdb, tdb->transaction->old_map_size,
991                                              tdb->file->map_size -
992                                              tdb->transaction->old_map_size) == -1) {
993                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
994                                    "tdb1_transaction_prepare_commit:"
995                                    " expansion failed");
996                         _tdb1_transaction_cancel(tdb);
997                         return -1;
998                 }
999                 tdb->file->map_size = tdb->transaction->old_map_size;
1000                 methods->tdb1_oob(tdb, tdb->file->map_size + 1, 1);
1001         }
1002
1003         /* Keep the open lock until the actual commit */
1004
1005         return 0;
1006 }
1007
1008 /*
1009    prepare to commit the current transaction
1010 */
1011 int tdb1_transaction_prepare_commit(struct tdb1_context *tdb)
1012 {
1013         return _tdb1_transaction_prepare_commit(tdb);
1014 }
1015
1016 /* A repack is worthwhile if the largest is less than half total free. */
1017 static bool repack_worthwhile(struct tdb1_context *tdb)
1018 {
1019         tdb1_off_t ptr;
1020         struct tdb1_record rec;
1021         tdb1_len_t total = 0, largest = 0;
1022
1023         if (tdb1_ofs_read(tdb, TDB1_FREELIST_TOP, &ptr) == -1) {
1024                 return false;
1025         }
1026
1027         while (ptr != 0 && tdb1_rec_free_read(tdb, ptr, &rec) == 0) {
1028                 total += rec.rec_len;
1029                 if (rec.rec_len > largest) {
1030                         largest = rec.rec_len;
1031                 }
1032                 ptr = rec.next;
1033         }
1034
1035         return total > largest * 2;
1036 }
1037
1038 /*
1039   commit the current transaction
1040 */
1041 int tdb1_transaction_commit(struct tdb1_context *tdb)
1042 {
1043         const struct tdb1_methods *methods;
1044         int i;
1045         bool need_repack = false;
1046
1047         if (tdb->transaction == NULL) {
1048                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
1049                                         "tdb1_transaction_commit:"
1050                                         " no transaction");
1051                 return -1;
1052         }
1053
1054         if (tdb->transaction->transaction_error) {
1055                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
1056                                         "tdb1_transaction_commit:"
1057                                         " transaction error pending");
1058                 _tdb1_transaction_cancel(tdb);
1059                 return -1;
1060         }
1061
1062
1063         if (tdb->transaction->nesting != 0) {
1064                 tdb->transaction->nesting--;
1065                 return 0;
1066         }
1067
1068         /* check for a null transaction */
1069         if (tdb->transaction->blocks == NULL) {
1070                 _tdb1_transaction_cancel(tdb);
1071                 return 0;
1072         }
1073
1074         if (!tdb->transaction->prepared) {
1075                 int ret = _tdb1_transaction_prepare_commit(tdb);
1076                 if (ret)
1077                         return ret;
1078         }
1079
1080         methods = tdb->transaction->io_methods;
1081
1082         /* perform all the writes */
1083         for (i=0;i<tdb->transaction->num_blocks;i++) {
1084                 tdb1_off_t offset;
1085                 tdb1_len_t length;
1086
1087                 if (tdb->transaction->blocks[i] == NULL) {
1088                         continue;
1089                 }
1090
1091                 offset = i * tdb->transaction->block_size;
1092                 length = tdb->transaction->block_size;
1093                 if (i == tdb->transaction->num_blocks-1) {
1094                         length = tdb->transaction->last_block_size;
1095                 }
1096
1097                 if (methods->tdb1_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1098                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1099                                    "tdb1_transaction_commit:"
1100                                    " write failed during commit");
1101
1102                         /* we've overwritten part of the data and
1103                            possibly expanded the file, so we need to
1104                            run the crash recovery code */
1105                         tdb->methods = methods;
1106                         tdb1_transaction_recover(tdb);
1107
1108                         _tdb1_transaction_cancel(tdb);
1109
1110                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1111                                    "tdb1_transaction_commit: write failed");
1112                         return -1;
1113                 }
1114                 SAFE_FREE(tdb->transaction->blocks[i]);
1115         }
1116
1117         /* Do this before we drop lock or blocks. */
1118         if (tdb->transaction->expanded) {
1119                 need_repack = repack_worthwhile(tdb);
1120         }
1121
1122         SAFE_FREE(tdb->transaction->blocks);
1123         tdb->transaction->num_blocks = 0;
1124
1125         /* ensure the new data is on disk */
1126         if (transaction1_sync(tdb, 0, tdb->file->map_size) == -1) {
1127                 return -1;
1128         }
1129
1130         /*
1131           TODO: maybe write to some dummy hdr field, or write to magic
1132           offset without mmap, before the last sync, instead of the
1133           utime() call
1134         */
1135
1136         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1137            don't change the mtime of the file, this means the file may
1138            not be backed up (as tdb rounding to block sizes means that
1139            file size changes are quite rare too). The following forces
1140            mtime changes when a transaction completes */
1141 #if HAVE_UTIME
1142         utime(tdb->name, NULL);
1143 #endif
1144
1145         /* use a transaction cancel to free memory and remove the
1146            transaction locks */
1147         _tdb1_transaction_cancel(tdb);
1148
1149         if (need_repack) {
1150                 return tdb1_repack(tdb);
1151         }
1152
1153         return 0;
1154 }
1155
1156
1157 /*
1158   recover from an aborted transaction. Must be called with exclusive
1159   database write access already established (including the open
1160   lock to prevent new processes attaching)
1161 */
1162 int tdb1_transaction_recover(struct tdb1_context *tdb)
1163 {
1164         tdb1_off_t recovery_head, recovery_eof;
1165         unsigned char *data, *p;
1166         uint32_t zero = 0;
1167         struct tdb1_record rec;
1168
1169         /* find the recovery area */
1170         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1171                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1172                            "tdb1_transaction_recover:"
1173                            " failed to read recovery head");
1174                 return -1;
1175         }
1176
1177         if (recovery_head == 0) {
1178                 /* we have never allocated a recovery record */
1179                 return 0;
1180         }
1181
1182         /* read the recovery record */
1183         if (tdb->methods->tdb1_read(tdb, recovery_head, &rec,
1184                                    sizeof(rec), TDB1_DOCONV()) == -1) {
1185                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1186                            "tdb1_transaction_recover:"
1187                            " failed to read recovery record");
1188                 return -1;
1189         }
1190
1191         if (rec.magic != TDB1_RECOVERY_MAGIC) {
1192                 /* there is no valid recovery data */
1193                 return 0;
1194         }
1195
1196         if (tdb->read_only) {
1197                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
1198                                         "tdb1_transaction_recover:"
1199                                         " attempt to recover read only"
1200                                         " database");
1201                 return -1;
1202         }
1203
1204         recovery_eof = rec.key_len;
1205
1206         data = (unsigned char *)malloc(rec.data_len);
1207         if (data == NULL) {
1208                 tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
1209                                         "tdb1_transaction_recover:"
1210                                         " failed to allocate recovery data");
1211                 return -1;
1212         }
1213
1214         /* read the full recovery data */
1215         if (tdb->methods->tdb1_read(tdb, recovery_head + sizeof(rec), data,
1216                                    rec.data_len, 0) == -1) {
1217                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1218                            "tdb1_transaction_recover:"
1219                            " failed to read recovery data");
1220                 return -1;
1221         }
1222
1223         /* recover the file data */
1224         p = data;
1225         while (p+8 < data + rec.data_len) {
1226                 uint32_t ofs, len;
1227                 if (TDB1_DOCONV()) {
1228                         tdb1_convert(p, 8);
1229                 }
1230                 memcpy(&ofs, p, 4);
1231                 memcpy(&len, p+4, 4);
1232
1233                 if (tdb->methods->tdb1_write(tdb, ofs, p+8, len) == -1) {
1234                         free(data);
1235                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1236                                    "tdb1_transaction_recover: failed to recover"
1237                                    " %d bytes at offset %d", len, ofs);
1238                         return -1;
1239                 }
1240                 p += 8 + len;
1241         }
1242
1243         free(data);
1244
1245         if (transaction1_sync(tdb, 0, tdb->file->map_size) == -1) {
1246                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1247                            "tdb1_transaction_recover: failed to sync recovery");
1248                 return -1;
1249         }
1250
1251         /* if the recovery area is after the recovered eof then remove it */
1252         if (recovery_eof <= recovery_head) {
1253                 if (tdb1_ofs_write(tdb, TDB1_RECOVERY_HEAD, &zero) == -1) {
1254                         tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1255                                    "tdb1_transaction_recover: failed to remove"
1256                                    " recovery head");
1257                         return -1;
1258                 }
1259         }
1260
1261         /* remove the recovery magic */
1262         if (tdb1_ofs_write(tdb, recovery_head + offsetof(struct tdb1_record, magic),
1263                           &zero) == -1) {
1264                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1265                            "tdb1_transaction_recover: failed to remove"
1266                            " recovery magic");
1267                 return -1;
1268         }
1269
1270         if (transaction1_sync(tdb, 0, recovery_eof) == -1) {
1271                 tdb_logerr(tdb, tdb->last_error, TDB_LOG_ERROR,
1272                            "tdb1_transaction_recover:"
1273                            " failed to sync2 recovery");
1274                 return -1;
1275         }
1276
1277         tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
1278                    "tdb1_transaction_recover: recovered %d byte database",
1279                    recovery_eof);
1280
1281         /* all done */
1282         return 0;
1283 }
1284
1285 /* Any I/O failures we say "needs recovery". */
1286 bool tdb1_needs_recovery(struct tdb1_context *tdb)
1287 {
1288         tdb1_off_t recovery_head;
1289         struct tdb1_record rec;
1290
1291         /* find the recovery area */
1292         if (tdb1_ofs_read(tdb, TDB1_RECOVERY_HEAD, &recovery_head) == -1) {
1293                 return true;
1294         }
1295
1296         if (recovery_head == 0) {
1297                 /* we have never allocated a recovery record */
1298                 return false;
1299         }
1300
1301         /* read the recovery record */
1302         if (tdb->methods->tdb1_read(tdb, recovery_head, &rec,
1303                                    sizeof(rec), TDB1_DOCONV()) == -1) {
1304                 return true;
1305         }
1306
1307         return (rec.magic == TDB1_RECOVERY_MAGIC);
1308 }