]> git.ozlabs.org Git - ccan/blob - ccan/tdb/transaction.c
First cut of replay_trace for tdb.
[ccan] / ccan / tdb / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_NO_NESTING is passed to flags in tdb open then transaction
89     nesting is disabled. tdb_transaction_start() will then implicitely
90     cancel any pending transactions and always start a new transaction
91     context instead of nesting.
92
93 */
94
95
96 /*
97   hold the context of any current transaction
98 */
99 struct tdb_transaction {
100         /* we keep a mirrored copy of the tdb hash heads here so
101            tdb_next_hash_chain() can operate efficiently */
102         uint32_t *hash_heads;
103
104         /* the original io methods - used to do IOs to the real db */
105         const struct tdb_methods *io_methods;
106
107         /* the list of transaction blocks. When a block is first
108            written to, it gets created in this list */
109         uint8_t **blocks;
110         uint32_t num_blocks;
111         uint32_t block_size;      /* bytes in each block */
112         uint32_t last_block_size; /* number of valid bytes in the last block */
113
114         /* non-zero when an internal transaction error has
115            occurred. All write operations will then fail until the
116            transaction is ended */
117         int transaction_error;
118
119         /* when inside a transaction we need to keep track of any
120            nested tdb_transaction_start() calls, as these are allowed,
121            but don't create a new transaction */
122         int nesting;
123
124         /* old file size before transaction */
125         tdb_len_t old_map_size;
126 };
127
128
129 /*
130   read while in a transaction. We need to check first if the data is in our list
131   of transaction elements, then if not do a real read
132 */
133 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
134                             tdb_len_t len, int cv)
135 {
136         uint32_t blk;
137
138         /* break it down into block sized ops */
139         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
140                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
141                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
142                         return -1;
143                 }
144                 len -= len2;
145                 off += len2;
146                 buf = (void *)(len2 + (char *)buf);
147         }
148
149         if (len == 0) {
150                 return 0;
151         }
152
153         blk = off / tdb->transaction->block_size;
154
155         /* see if we have it in the block list */
156         if (tdb->transaction->num_blocks <= blk ||
157             tdb->transaction->blocks[blk] == NULL) {
158                 /* nope, do a real read */
159                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
160                         goto fail;
161                 }
162                 return 0;
163         }
164
165         /* it is in the block list. Now check for the last block */
166         if (blk == tdb->transaction->num_blocks-1) {
167                 if (len > tdb->transaction->last_block_size) {
168                         goto fail;
169                 }
170         }
171         
172         /* now copy it out of this block */
173         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
174         if (cv) {
175                 tdb_convert(buf, len);
176         }
177         return 0;
178
179 fail:
180         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
181         tdb->ecode = TDB_ERR_IO;
182         tdb->transaction->transaction_error = 1;
183         return -1;
184 }
185
186
187 /*
188   write while in a transaction
189 */
190 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
191                              const void *buf, tdb_len_t len)
192 {
193         uint32_t blk;
194
195         /* if the write is to a hash head, then update the transaction
196            hash heads */
197         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
198             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
199                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
200                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
201         }
202
203         /* break it up into block sized chunks */
204         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
205                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
206                 if (transaction_write(tdb, off, buf, len2) != 0) {
207                         return -1;
208                 }
209                 len -= len2;
210                 off += len2;
211                 if (buf != NULL) {
212                         buf = (const void *)(len2 + (const char *)buf);
213                 }
214         }
215
216         if (len == 0) {
217                 return 0;
218         }
219
220         blk = off / tdb->transaction->block_size;
221         off = off % tdb->transaction->block_size;
222
223         if (tdb->transaction->num_blocks <= blk) {
224                 uint8_t **new_blocks;
225                 /* expand the blocks array */
226                 if (tdb->transaction->blocks == NULL) {
227                         new_blocks = (uint8_t **)malloc(
228                                 (blk+1)*sizeof(uint8_t *));
229                 } else {
230                         new_blocks = (uint8_t **)realloc(
231                                 tdb->transaction->blocks,
232                                 (blk+1)*sizeof(uint8_t *));
233                 }
234                 if (new_blocks == NULL) {
235                         tdb->ecode = TDB_ERR_OOM;
236                         goto fail;
237                 }
238                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
239                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
240                 tdb->transaction->blocks = new_blocks;
241                 tdb->transaction->num_blocks = blk+1;
242                 tdb->transaction->last_block_size = 0;
243         }
244
245         /* allocate and fill a block? */
246         if (tdb->transaction->blocks[blk] == NULL) {
247                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
248                 if (tdb->transaction->blocks[blk] == NULL) {
249                         tdb->ecode = TDB_ERR_OOM;
250                         tdb->transaction->transaction_error = 1;
251                         return -1;                      
252                 }
253                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
254                         tdb_len_t len2 = tdb->transaction->block_size;
255                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
256                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
257                         }
258                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
259                                                                    tdb->transaction->blocks[blk], 
260                                                                    len2, 0) != 0) {
261                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
262                                 tdb->ecode = TDB_ERR_IO;
263                                 goto fail;
264                         }
265                         if (blk == tdb->transaction->num_blocks-1) {
266                                 tdb->transaction->last_block_size = len2;
267                         }                       
268                 }
269         }
270         
271         /* overwrite part of an existing block */
272         if (buf == NULL) {
273                 memset(tdb->transaction->blocks[blk] + off, 0, len);
274         } else {
275                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
276         }
277         if (blk == tdb->transaction->num_blocks-1) {
278                 if (len + off > tdb->transaction->last_block_size) {
279                         tdb->transaction->last_block_size = len + off;
280                 }
281         }
282
283         return 0;
284
285 fail:
286         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
287                  (blk*tdb->transaction->block_size) + off, len));
288         tdb->transaction->transaction_error = 1;
289         return -1;
290 }
291
292
293 /*
294   write while in a transaction - this varient never expands the transaction blocks, it only
295   updates existing blocks. This means it cannot change the recovery size
296 */
297 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
298                                       const void *buf, tdb_len_t len)
299 {
300         uint32_t blk;
301
302         /* break it up into block sized chunks */
303         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
304                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
305                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
306                         return -1;
307                 }
308                 len -= len2;
309                 off += len2;
310                 if (buf != NULL) {
311                         buf = (const void *)(len2 + (const char *)buf);
312                 }
313         }
314
315         if (len == 0) {
316                 return 0;
317         }
318
319         blk = off / tdb->transaction->block_size;
320         off = off % tdb->transaction->block_size;
321
322         if (tdb->transaction->num_blocks <= blk ||
323             tdb->transaction->blocks[blk] == NULL) {
324                 return 0;
325         }
326
327         if (blk == tdb->transaction->num_blocks-1 &&
328             off + len > tdb->transaction->last_block_size) {
329                 if (off >= tdb->transaction->last_block_size) {
330                         return 0;
331                 }
332                 len = tdb->transaction->last_block_size - off;
333         }
334
335         /* overwrite part of an existing block */
336         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
337
338         return 0;
339 }
340
341
342 /*
343   accelerated hash chain head search, using the cached hash heads
344 */
345 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
346 {
347         uint32_t h = *chain;
348         for (;h < tdb->header.hash_size;h++) {
349                 /* the +1 takes account of the freelist */
350                 if (0 != tdb->transaction->hash_heads[h+1]) {
351                         break;
352                 }
353         }
354         (*chain) = h;
355 }
356
357 /*
358   out of bounds check during a transaction
359 */
360 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
361 {
362         if (len <= tdb->map_size) {
363                 return 0;
364         }
365         return TDB_ERRCODE(TDB_ERR_IO, -1);
366 }
367
368 /*
369   transaction version of tdb_expand().
370 */
371 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
372                                    tdb_off_t addition)
373 {
374         /* add a write to the transaction elements, so subsequent
375            reads see the zero data */
376         if (transaction_write(tdb, size, NULL, addition) != 0) {
377                 return -1;
378         }
379
380         return 0;
381 }
382
383 /*
384   brlock during a transaction - ignore them
385 */
386 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
387                               int rw_type, int lck_type, int probe, size_t len)
388 {
389         return 0;
390 }
391
392 static const struct tdb_methods transaction_methods = {
393         transaction_read,
394         transaction_write,
395         transaction_next_hash_chain,
396         transaction_oob,
397         transaction_expand_file,
398         transaction_brlock
399 };
400
401 int tdb_transaction_cancel_internal(struct tdb_context *tdb)
402 {
403         int i;
404
405         if (tdb->transaction == NULL) {
406                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
407                 return -1;
408         }
409
410         if (tdb->transaction->nesting != 0) {
411                 tdb->transaction->transaction_error = 1;
412                 tdb->transaction->nesting--;
413                 return 0;
414         }               
415
416         tdb->map_size = tdb->transaction->old_map_size;
417
418         /* free all the transaction blocks */
419         for (i=0;i<tdb->transaction->num_blocks;i++) {
420                 if (tdb->transaction->blocks[i] != NULL) {
421                         free(tdb->transaction->blocks[i]);
422                 }
423         }
424         SAFE_FREE(tdb->transaction->blocks);
425
426         /* remove any global lock created during the transaction */
427         if (tdb->global_lock.count != 0) {
428                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
429                 tdb->global_lock.count = 0;
430         }
431
432         /* remove any locks created during the transaction */
433         if (tdb->num_locks != 0) {
434                 for (i=0;i<tdb->num_lockrecs;i++) {
435                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
436                                    F_UNLCK,F_SETLKW, 0, 1);
437                 }
438                 tdb->num_locks = 0;
439                 tdb->num_lockrecs = 0;
440                 SAFE_FREE(tdb->lockrecs);
441         }
442
443         /* restore the normal io methods */
444         tdb->methods = tdb->transaction->io_methods;
445
446         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
447         tdb_transaction_unlock(tdb);
448         SAFE_FREE(tdb->transaction->hash_heads);
449         SAFE_FREE(tdb->transaction);
450         
451         return 0;
452 }
453
454 /*
455   start a tdb transaction. No token is returned, as only a single
456   transaction is allowed to be pending per tdb_context
457 */
458 int tdb_transaction_start(struct tdb_context *tdb)
459 {
460         tdb_trace(tdb, "tdb_transaction_start\n");
461
462         /* some sanity checks */
463         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
464                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
465                 tdb->ecode = TDB_ERR_EINVAL;
466                 return -1;
467         }
468
469         /* cope with nested tdb_transaction_start() calls */
470         if (tdb->transaction != NULL) {
471                 if (!tdb->flags & TDB_NO_NESTING) {
472                         tdb->transaction->nesting++;
473                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
474                                  tdb->transaction->nesting));
475                         return 0;
476                 } else {
477                         tdb_transaction_cancel_internal(tdb);
478                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
479                 }
480         }
481
482         if (tdb->num_locks != 0 || tdb->global_lock.count) {
483                 /* the caller must not have any locks when starting a
484                    transaction as otherwise we'll be screwed by lack
485                    of nested locks in posix */
486                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
487                 tdb->ecode = TDB_ERR_LOCK;
488                 return -1;
489         }
490
491         if (tdb->travlocks.next != NULL) {
492                 /* you cannot use transactions inside a traverse (although you can use
493                    traverse inside a transaction) as otherwise you can end up with
494                    deadlock */
495                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
496                 tdb->ecode = TDB_ERR_LOCK;
497                 return -1;
498         }
499
500         tdb->transaction = (struct tdb_transaction *)
501                 calloc(sizeof(struct tdb_transaction), 1);
502         if (tdb->transaction == NULL) {
503                 tdb->ecode = TDB_ERR_OOM;
504                 return -1;
505         }
506
507         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
508         tdb->transaction->block_size = tdb->page_size;
509
510         /* get the transaction write lock. This is a blocking lock. As
511            discussed with Volker, there are a number of ways we could
512            make this async, which we will probably do in the future */
513         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
514                 SAFE_FREE(tdb->transaction->blocks);
515                 SAFE_FREE(tdb->transaction);
516                 return -1;
517         }
518         
519         /* get a read lock from the freelist to the end of file. This
520            is upgraded to a write lock during the commit */
521         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
522                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
523                 tdb->ecode = TDB_ERR_LOCK;
524                 goto fail;
525         }
526
527         /* setup a copy of the hash table heads so the hash scan in
528            traverse can be fast */
529         tdb->transaction->hash_heads = (uint32_t *)
530                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
531         if (tdb->transaction->hash_heads == NULL) {
532                 tdb->ecode = TDB_ERR_OOM;
533                 goto fail;
534         }
535         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
536                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
537                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
538                 tdb->ecode = TDB_ERR_IO;
539                 goto fail;
540         }
541
542         /* make sure we know about any file expansions already done by
543            anyone else */
544         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
545         tdb->transaction->old_map_size = tdb->map_size;
546
547         /* finally hook the io methods, replacing them with
548            transaction specific methods */
549         tdb->transaction->io_methods = tdb->methods;
550         tdb->methods = &transaction_methods;
551
552         return 0;
553         
554 fail:
555         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
556         tdb_transaction_unlock(tdb);
557         SAFE_FREE(tdb->transaction->blocks);
558         SAFE_FREE(tdb->transaction->hash_heads);
559         SAFE_FREE(tdb->transaction);
560         return -1;
561 }
562
563
564 /*
565   cancel the current transaction
566 */
567 int tdb_transaction_cancel(struct tdb_context *tdb)
568 {       
569         tdb_trace(tdb, "tdb_transaction_cancel\n");
570         return tdb_transaction_cancel_internal(tdb);
571 }
572 /*
573   sync to disk
574 */
575 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
576 {       
577         if (fsync(tdb->fd) != 0) {
578                 tdb->ecode = TDB_ERR_IO;
579                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
580                 return -1;
581         }
582 #ifdef MS_SYNC
583         if (tdb->map_ptr) {
584                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
585                 if (msync(moffset + (char *)tdb->map_ptr, 
586                           length + (offset - moffset), MS_SYNC) != 0) {
587                         tdb->ecode = TDB_ERR_IO;
588                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
589                                  strerror(errno)));
590                         return -1;
591                 }
592         }
593 #endif
594         return 0;
595 }
596
597
598 /*
599   work out how much space the linearised recovery data will consume
600 */
601 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
602 {
603         tdb_len_t recovery_size = 0;
604         int i;
605
606         recovery_size = sizeof(uint32_t);
607         for (i=0;i<tdb->transaction->num_blocks;i++) {
608                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
609                         break;
610                 }
611                 if (tdb->transaction->blocks[i] == NULL) {
612                         continue;
613                 }
614                 recovery_size += 2*sizeof(tdb_off_t);
615                 if (i == tdb->transaction->num_blocks-1) {
616                         recovery_size += tdb->transaction->last_block_size;
617                 } else {
618                         recovery_size += tdb->transaction->block_size;
619                 }
620         }       
621
622         return recovery_size;
623 }
624
625 /*
626   allocate the recovery area, or use an existing recovery area if it is
627   large enough
628 */
629 static int tdb_recovery_allocate(struct tdb_context *tdb, 
630                                  tdb_len_t *recovery_size,
631                                  tdb_off_t *recovery_offset,
632                                  tdb_len_t *recovery_max_size)
633 {
634         struct list_struct rec;
635         const struct tdb_methods *methods = tdb->transaction->io_methods;
636         tdb_off_t recovery_head;
637
638         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
639                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
640                 return -1;
641         }
642
643         rec.rec_len = 0;
644
645         if (recovery_head != 0 && 
646             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
647                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
648                 return -1;
649         }
650
651         *recovery_size = tdb_recovery_size(tdb);
652
653         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
654                 /* it fits in the existing area */
655                 *recovery_max_size = rec.rec_len;
656                 *recovery_offset = recovery_head;
657                 return 0;
658         }
659
660         /* we need to free up the old recovery area, then allocate a
661            new one at the end of the file. Note that we cannot use
662            tdb_allocate() to allocate the new one as that might return
663            us an area that is being currently used (as of the start of
664            the transaction) */
665         if (recovery_head != 0) {
666                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
667                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
668                         return -1;
669                 }
670         }
671
672         /* the tdb_free() call might have increased the recovery size */
673         *recovery_size = tdb_recovery_size(tdb);
674
675         /* round up to a multiple of page size */
676         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
677         *recovery_offset = tdb->map_size;
678         recovery_head = *recovery_offset;
679
680         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
681                                      (tdb->map_size - tdb->transaction->old_map_size) +
682                                      sizeof(rec) + *recovery_max_size) == -1) {
683                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
684                 return -1;
685         }
686
687         /* remap the file (if using mmap) */
688         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
689
690         /* we have to reset the old map size so that we don't try to expand the file
691            again in the transaction commit, which would destroy the recovery area */
692         tdb->transaction->old_map_size = tdb->map_size;
693
694         /* write the recovery header offset and sync - we can sync without a race here
695            as the magic ptr in the recovery record has not been set */
696         CONVERT(recovery_head);
697         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
698                                &recovery_head, sizeof(tdb_off_t)) == -1) {
699                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
700                 return -1;
701         }
702         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
703                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
704                 return -1;
705         }
706
707         return 0;
708 }
709
710
711 /*
712   setup the recovery data that will be used on a crash during commit
713 */
714 static int transaction_setup_recovery(struct tdb_context *tdb, 
715                                       tdb_off_t *magic_offset)
716 {
717         tdb_len_t recovery_size;
718         unsigned char *data, *p;
719         const struct tdb_methods *methods = tdb->transaction->io_methods;
720         struct list_struct *rec;
721         tdb_off_t recovery_offset, recovery_max_size;
722         tdb_off_t old_map_size = tdb->transaction->old_map_size;
723         uint32_t magic, tailer;
724         int i;
725
726         /*
727           check that the recovery area has enough space
728         */
729         if (tdb_recovery_allocate(tdb, &recovery_size, 
730                                   &recovery_offset, &recovery_max_size) == -1) {
731                 return -1;
732         }
733
734         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
735         if (data == NULL) {
736                 tdb->ecode = TDB_ERR_OOM;
737                 return -1;
738         }
739
740         rec = (struct list_struct *)data;
741         memset(rec, 0, sizeof(*rec));
742
743         rec->magic    = 0;
744         rec->data_len = recovery_size;
745         rec->rec_len  = recovery_max_size;
746         rec->key_len  = old_map_size;
747         CONVERT(rec);
748
749         /* build the recovery data into a single blob to allow us to do a single
750            large write, which should be more efficient */
751         p = data + sizeof(*rec);
752         for (i=0;i<tdb->transaction->num_blocks;i++) {
753                 tdb_off_t offset;
754                 tdb_len_t length;
755
756                 if (tdb->transaction->blocks[i] == NULL) {
757                         continue;
758                 }
759
760                 offset = i * tdb->transaction->block_size;
761                 length = tdb->transaction->block_size;
762                 if (i == tdb->transaction->num_blocks-1) {
763                         length = tdb->transaction->last_block_size;
764                 }
765                 
766                 if (offset >= old_map_size) {
767                         continue;
768                 }
769                 if (offset + length > tdb->transaction->old_map_size) {
770                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
771                         free(data);
772                         tdb->ecode = TDB_ERR_CORRUPT;
773                         return -1;
774                 }
775                 memcpy(p, &offset, 4);
776                 memcpy(p+4, &length, 4);
777                 if (DOCONV()) {
778                         tdb_convert(p, 8);
779                 }
780                 /* the recovery area contains the old data, not the
781                    new data, so we have to call the original tdb_read
782                    method to get it */
783                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
784                         free(data);
785                         tdb->ecode = TDB_ERR_IO;
786                         return -1;
787                 }
788                 p += 8 + length;
789         }
790
791         /* and the tailer */
792         tailer = sizeof(*rec) + recovery_max_size;
793         memcpy(p, &tailer, 4);
794         CONVERT(p);
795
796         /* write the recovery data to the recovery area */
797         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
798                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
799                 free(data);
800                 tdb->ecode = TDB_ERR_IO;
801                 return -1;
802         }
803         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
804                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
805                 free(data);
806                 tdb->ecode = TDB_ERR_IO;
807                 return -1;
808         }
809
810         /* as we don't have ordered writes, we have to sync the recovery
811            data before we update the magic to indicate that the recovery
812            data is present */
813         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
814                 free(data);
815                 return -1;
816         }
817
818         free(data);
819
820         magic = TDB_RECOVERY_MAGIC;
821         CONVERT(magic);
822
823         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
824
825         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
826                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
827                 tdb->ecode = TDB_ERR_IO;
828                 return -1;
829         }
830         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
831                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
832                 tdb->ecode = TDB_ERR_IO;
833                 return -1;
834         }
835
836         /* ensure the recovery magic marker is on disk */
837         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
838                 return -1;
839         }
840
841         return 0;
842 }
843
844 /*
845   commit the current transaction
846 */
847 int tdb_transaction_commit(struct tdb_context *tdb)
848 {       
849         const struct tdb_methods *methods;
850         tdb_off_t magic_offset = 0;
851         uint32_t zero = 0;
852         int i;
853
854         tdb_trace(tdb, "tdb_transaction_commit\n");
855         if (tdb->transaction == NULL) {
856                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
857                 return -1;
858         }
859
860         if (tdb->transaction->transaction_error) {
861                 tdb->ecode = TDB_ERR_IO;
862                 tdb_transaction_cancel_internal(tdb);
863                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
864                 return -1;
865         }
866
867
868         if (tdb->transaction->nesting != 0) {
869                 tdb->transaction->nesting--;
870                 return 0;
871         }               
872
873         /* check for a null transaction */
874         if (tdb->transaction->blocks == NULL) {
875                 tdb_transaction_cancel_internal(tdb);
876                 return 0;
877         }
878
879         methods = tdb->transaction->io_methods;
880         
881         /* if there are any locks pending then the caller has not
882            nested their locks properly, so fail the transaction */
883         if (tdb->num_locks || tdb->global_lock.count) {
884                 tdb->ecode = TDB_ERR_LOCK;
885                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
886                 tdb_transaction_cancel_internal(tdb);
887                 return -1;
888         }
889
890         /* upgrade the main transaction lock region to a write lock */
891         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
892                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
893                 tdb->ecode = TDB_ERR_LOCK;
894                 tdb_transaction_cancel_internal(tdb);
895                 return -1;
896         }
897
898         /* get the global lock - this prevents new users attaching to the database
899            during the commit */
900         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
901                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
902                 tdb->ecode = TDB_ERR_LOCK;
903                 tdb_transaction_cancel_internal(tdb);
904                 return -1;
905         }
906
907         if (!(tdb->flags & TDB_NOSYNC)) {
908                 /* write the recovery data to the end of the file */
909                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
910                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
911                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
912                         tdb_transaction_cancel_internal(tdb);
913                         return -1;
914                 }
915         }
916
917         /* expand the file to the new size if needed */
918         if (tdb->map_size != tdb->transaction->old_map_size) {
919                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
920                                              tdb->map_size - 
921                                              tdb->transaction->old_map_size) == -1) {
922                         tdb->ecode = TDB_ERR_IO;
923                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
924                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
925                         tdb_transaction_cancel_internal(tdb);
926                         return -1;
927                 }
928                 tdb->map_size = tdb->transaction->old_map_size;
929                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
930         }
931
932         /* perform all the writes */
933         for (i=0;i<tdb->transaction->num_blocks;i++) {
934                 tdb_off_t offset;
935                 tdb_len_t length;
936
937                 if (tdb->transaction->blocks[i] == NULL) {
938                         continue;
939                 }
940
941                 offset = i * tdb->transaction->block_size;
942                 length = tdb->transaction->block_size;
943                 if (i == tdb->transaction->num_blocks-1) {
944                         length = tdb->transaction->last_block_size;
945                 }
946
947                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
948                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
949                         
950                         /* we've overwritten part of the data and
951                            possibly expanded the file, so we need to
952                            run the crash recovery code */
953                         tdb->methods = methods;
954                         tdb_transaction_recover(tdb); 
955
956                         tdb_transaction_cancel_internal(tdb);
957                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
958
959                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
960                         return -1;
961                 }
962                 SAFE_FREE(tdb->transaction->blocks[i]);
963         } 
964
965         SAFE_FREE(tdb->transaction->blocks);
966         tdb->transaction->num_blocks = 0;
967
968         if (!(tdb->flags & TDB_NOSYNC)) {
969                 /* ensure the new data is on disk */
970                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
971                         return -1;
972                 }
973
974                 /* remove the recovery marker */
975                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
976                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
977                         return -1;
978                 }
979
980                 /* ensure the recovery marker has been removed on disk */
981                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
982                         return -1;
983                 }
984         }
985
986         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
987
988         /*
989           TODO: maybe write to some dummy hdr field, or write to magic
990           offset without mmap, before the last sync, instead of the
991           utime() call
992         */
993
994         /* on some systems (like Linux 2.6.x) changes via mmap/msync
995            don't change the mtime of the file, this means the file may
996            not be backed up (as tdb rounding to block sizes means that
997            file size changes are quite rare too). The following forces
998            mtime changes when a transaction completes */
999 #if HAVE_UTIME
1000         utime(tdb->name, NULL);
1001 #endif
1002
1003         /* use a transaction cancel to free memory and remove the
1004            transaction locks */
1005         tdb_transaction_cancel_internal(tdb);
1006
1007         return 0;
1008 }
1009
1010
1011 /*
1012   recover from an aborted transaction. Must be called with exclusive
1013   database write access already established (including the global
1014   lock to prevent new processes attaching)
1015 */
1016 int tdb_transaction_recover(struct tdb_context *tdb)
1017 {
1018         tdb_off_t recovery_head, recovery_eof;
1019         unsigned char *data, *p;
1020         uint32_t zero = 0;
1021         struct list_struct rec;
1022
1023         /* find the recovery area */
1024         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1025                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1026                 tdb->ecode = TDB_ERR_IO;
1027                 return -1;
1028         }
1029
1030         if (recovery_head == 0) {
1031                 /* we have never allocated a recovery record */
1032                 return 0;
1033         }
1034
1035         /* read the recovery record */
1036         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1037                                    sizeof(rec), DOCONV()) == -1) {
1038                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1039                 tdb->ecode = TDB_ERR_IO;
1040                 return -1;
1041         }
1042
1043         if (rec.magic != TDB_RECOVERY_MAGIC) {
1044                 /* there is no valid recovery data */
1045                 return 0;
1046         }
1047
1048         if (tdb->read_only) {
1049                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1050                 tdb->ecode = TDB_ERR_CORRUPT;
1051                 return -1;
1052         }
1053
1054         recovery_eof = rec.key_len;
1055
1056         data = (unsigned char *)malloc(rec.data_len);
1057         if (data == NULL) {
1058                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1059                 tdb->ecode = TDB_ERR_OOM;
1060                 return -1;
1061         }
1062
1063         /* read the full recovery data */
1064         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1065                                    rec.data_len, 0) == -1) {
1066                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1067                 tdb->ecode = TDB_ERR_IO;
1068                 return -1;
1069         }
1070
1071         /* recover the file data */
1072         p = data;
1073         while (p+8 < data + rec.data_len) {
1074                 uint32_t ofs, len;
1075                 if (DOCONV()) {
1076                         tdb_convert(p, 8);
1077                 }
1078                 memcpy(&ofs, p, 4);
1079                 memcpy(&len, p+4, 4);
1080
1081                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1082                         free(data);
1083                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1084                         tdb->ecode = TDB_ERR_IO;
1085                         return -1;
1086                 }
1087                 p += 8 + len;
1088         }
1089
1090         free(data);
1091
1092         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1093                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1094                 tdb->ecode = TDB_ERR_IO;
1095                 return -1;
1096         }
1097
1098         /* if the recovery area is after the recovered eof then remove it */
1099         if (recovery_eof <= recovery_head) {
1100                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1101                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1102                         tdb->ecode = TDB_ERR_IO;
1103                         return -1;                      
1104                 }
1105         }
1106
1107         /* remove the recovery magic */
1108         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1109                           &zero) == -1) {
1110                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1111                 tdb->ecode = TDB_ERR_IO;
1112                 return -1;                      
1113         }
1114         
1115         /* reduce the file size to the old size */
1116         tdb_munmap(tdb);
1117         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1118                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1119                 tdb->ecode = TDB_ERR_IO;
1120                 return -1;                      
1121         }
1122         tdb->map_size = recovery_eof;
1123         tdb_mmap(tdb);
1124
1125         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1126                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1127                 tdb->ecode = TDB_ERR_IO;
1128                 return -1;
1129         }
1130
1131         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1132                  recovery_eof));
1133
1134         /* all done */
1135         return 0;
1136 }