]> git.ozlabs.org Git - ccan/blob - ccan/tdb/transaction.c
Fix sequence numbers when tracing transaction.
[ccan] / ccan / tdb / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_NO_NESTING is passed to flags in tdb open then transaction
89     nesting is disabled. tdb_transaction_start() will then implicitely
90     cancel any pending transactions and always start a new transaction
91     context instead of nesting.
92
93 */
94
95
96 /*
97   hold the context of any current transaction
98 */
99 struct tdb_transaction {
100         /* we keep a mirrored copy of the tdb hash heads here so
101            tdb_next_hash_chain() can operate efficiently */
102         uint32_t *hash_heads;
103
104         /* the original io methods - used to do IOs to the real db */
105         const struct tdb_methods *io_methods;
106
107         /* the list of transaction blocks. When a block is first
108            written to, it gets created in this list */
109         uint8_t **blocks;
110         uint32_t num_blocks;
111         uint32_t block_size;      /* bytes in each block */
112         uint32_t last_block_size; /* number of valid bytes in the last block */
113
114         /* non-zero when an internal transaction error has
115            occurred. All write operations will then fail until the
116            transaction is ended */
117         int transaction_error;
118
119         /* when inside a transaction we need to keep track of any
120            nested tdb_transaction_start() calls, as these are allowed,
121            but don't create a new transaction */
122         int nesting;
123
124         /* old file size before transaction */
125         tdb_len_t old_map_size;
126 };
127
128
129 /*
130   read while in a transaction. We need to check first if the data is in our list
131   of transaction elements, then if not do a real read
132 */
133 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
134                             tdb_len_t len, int cv)
135 {
136         uint32_t blk;
137
138         /* break it down into block sized ops */
139         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
140                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
141                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
142                         return -1;
143                 }
144                 len -= len2;
145                 off += len2;
146                 buf = (void *)(len2 + (char *)buf);
147         }
148
149         if (len == 0) {
150                 return 0;
151         }
152
153         blk = off / tdb->transaction->block_size;
154
155         /* see if we have it in the block list */
156         if (tdb->transaction->num_blocks <= blk ||
157             tdb->transaction->blocks[blk] == NULL) {
158                 /* nope, do a real read */
159                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
160                         goto fail;
161                 }
162                 return 0;
163         }
164
165         /* it is in the block list. Now check for the last block */
166         if (blk == tdb->transaction->num_blocks-1) {
167                 if (len > tdb->transaction->last_block_size) {
168                         goto fail;
169                 }
170         }
171         
172         /* now copy it out of this block */
173         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
174         if (cv) {
175                 tdb_convert(buf, len);
176         }
177         return 0;
178
179 fail:
180         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
181         tdb->ecode = TDB_ERR_IO;
182         tdb->transaction->transaction_error = 1;
183         return -1;
184 }
185
186
187 /*
188   write while in a transaction
189 */
190 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
191                              const void *buf, tdb_len_t len)
192 {
193         uint32_t blk;
194
195         /* if the write is to a hash head, then update the transaction
196            hash heads */
197         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
198             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
199                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
200                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
201         }
202
203         /* break it up into block sized chunks */
204         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
205                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
206                 if (transaction_write(tdb, off, buf, len2) != 0) {
207                         return -1;
208                 }
209                 len -= len2;
210                 off += len2;
211                 if (buf != NULL) {
212                         buf = (const void *)(len2 + (const char *)buf);
213                 }
214         }
215
216         if (len == 0) {
217                 return 0;
218         }
219
220         blk = off / tdb->transaction->block_size;
221         off = off % tdb->transaction->block_size;
222
223         if (tdb->transaction->num_blocks <= blk) {
224                 uint8_t **new_blocks;
225                 /* expand the blocks array */
226                 if (tdb->transaction->blocks == NULL) {
227                         new_blocks = (uint8_t **)malloc(
228                                 (blk+1)*sizeof(uint8_t *));
229                 } else {
230                         new_blocks = (uint8_t **)realloc(
231                                 tdb->transaction->blocks,
232                                 (blk+1)*sizeof(uint8_t *));
233                 }
234                 if (new_blocks == NULL) {
235                         tdb->ecode = TDB_ERR_OOM;
236                         goto fail;
237                 }
238                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
239                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
240                 tdb->transaction->blocks = new_blocks;
241                 tdb->transaction->num_blocks = blk+1;
242                 tdb->transaction->last_block_size = 0;
243         }
244
245         /* allocate and fill a block? */
246         if (tdb->transaction->blocks[blk] == NULL) {
247                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
248                 if (tdb->transaction->blocks[blk] == NULL) {
249                         tdb->ecode = TDB_ERR_OOM;
250                         tdb->transaction->transaction_error = 1;
251                         return -1;                      
252                 }
253                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
254                         tdb_len_t len2 = tdb->transaction->block_size;
255                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
256                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
257                         }
258                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
259                                                                    tdb->transaction->blocks[blk], 
260                                                                    len2, 0) != 0) {
261                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
262                                 tdb->ecode = TDB_ERR_IO;
263                                 goto fail;
264                         }
265                         if (blk == tdb->transaction->num_blocks-1) {
266                                 tdb->transaction->last_block_size = len2;
267                         }                       
268                 }
269         }
270         
271         /* overwrite part of an existing block */
272         if (buf == NULL) {
273                 memset(tdb->transaction->blocks[blk] + off, 0, len);
274         } else {
275                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
276         }
277         if (blk == tdb->transaction->num_blocks-1) {
278                 if (len + off > tdb->transaction->last_block_size) {
279                         tdb->transaction->last_block_size = len + off;
280                 }
281         }
282
283         return 0;
284
285 fail:
286         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
287                  (blk*tdb->transaction->block_size) + off, len));
288         tdb->transaction->transaction_error = 1;
289         return -1;
290 }
291
292
293 /*
294   write while in a transaction - this varient never expands the transaction blocks, it only
295   updates existing blocks. This means it cannot change the recovery size
296 */
297 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
298                                       const void *buf, tdb_len_t len)
299 {
300         uint32_t blk;
301
302         /* break it up into block sized chunks */
303         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
304                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
305                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
306                         return -1;
307                 }
308                 len -= len2;
309                 off += len2;
310                 if (buf != NULL) {
311                         buf = (const void *)(len2 + (const char *)buf);
312                 }
313         }
314
315         if (len == 0) {
316                 return 0;
317         }
318
319         blk = off / tdb->transaction->block_size;
320         off = off % tdb->transaction->block_size;
321
322         if (tdb->transaction->num_blocks <= blk ||
323             tdb->transaction->blocks[blk] == NULL) {
324                 return 0;
325         }
326
327         if (blk == tdb->transaction->num_blocks-1 &&
328             off + len > tdb->transaction->last_block_size) {
329                 if (off >= tdb->transaction->last_block_size) {
330                         return 0;
331                 }
332                 len = tdb->transaction->last_block_size - off;
333         }
334
335         /* overwrite part of an existing block */
336         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
337
338         return 0;
339 }
340
341
342 /*
343   accelerated hash chain head search, using the cached hash heads
344 */
345 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
346 {
347         uint32_t h = *chain;
348         for (;h < tdb->header.hash_size;h++) {
349                 /* the +1 takes account of the freelist */
350                 if (0 != tdb->transaction->hash_heads[h+1]) {
351                         break;
352                 }
353         }
354         (*chain) = h;
355 }
356
357 /*
358   out of bounds check during a transaction
359 */
360 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
361 {
362         if (len <= tdb->map_size) {
363                 return 0;
364         }
365         return TDB_ERRCODE(TDB_ERR_IO, -1);
366 }
367
368 /*
369   transaction version of tdb_expand().
370 */
371 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
372                                    tdb_off_t addition)
373 {
374         /* add a write to the transaction elements, so subsequent
375            reads see the zero data */
376         if (transaction_write(tdb, size, NULL, addition) != 0) {
377                 return -1;
378         }
379
380         return 0;
381 }
382
383 /*
384   brlock during a transaction - ignore them
385 */
386 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
387                               int rw_type, int lck_type, int probe, size_t len)
388 {
389         return 0;
390 }
391
392 static const struct tdb_methods transaction_methods = {
393         transaction_read,
394         transaction_write,
395         transaction_next_hash_chain,
396         transaction_oob,
397         transaction_expand_file,
398         transaction_brlock
399 };
400
401 int tdb_transaction_cancel_internal(struct tdb_context *tdb)
402 {
403         int i;
404
405         if (tdb->transaction == NULL) {
406                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
407                 return -1;
408         }
409
410         if (tdb->transaction->nesting != 0) {
411                 tdb->transaction->transaction_error = 1;
412                 tdb->transaction->nesting--;
413                 return 0;
414         }               
415
416         tdb->map_size = tdb->transaction->old_map_size;
417
418         /* free all the transaction blocks */
419         for (i=0;i<tdb->transaction->num_blocks;i++) {
420                 if (tdb->transaction->blocks[i] != NULL) {
421                         free(tdb->transaction->blocks[i]);
422                 }
423         }
424         SAFE_FREE(tdb->transaction->blocks);
425
426         /* remove any global lock created during the transaction */
427         if (tdb->global_lock.count != 0) {
428                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
429                 tdb->global_lock.count = 0;
430         }
431
432         /* remove any locks created during the transaction */
433         if (tdb->num_locks != 0) {
434                 for (i=0;i<tdb->num_lockrecs;i++) {
435                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
436                                    F_UNLCK,F_SETLKW, 0, 1);
437                 }
438                 tdb->num_locks = 0;
439                 tdb->num_lockrecs = 0;
440                 SAFE_FREE(tdb->lockrecs);
441         }
442
443         /* restore the normal io methods */
444         tdb->methods = tdb->transaction->io_methods;
445
446         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
447         tdb_transaction_unlock(tdb);
448         SAFE_FREE(tdb->transaction->hash_heads);
449         SAFE_FREE(tdb->transaction);
450         
451         return 0;
452 }
453
454 /*
455   start a tdb transaction. No token is returned, as only a single
456   transaction is allowed to be pending per tdb_context
457 */
458 int tdb_transaction_start(struct tdb_context *tdb)
459 {
460         /* some sanity checks */
461         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
462                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
463                 tdb->ecode = TDB_ERR_EINVAL;
464                 return -1;
465         }
466
467         /* cope with nested tdb_transaction_start() calls */
468         if (tdb->transaction != NULL) {
469                 tdb_trace(tdb, "tdb_transaction_start");
470                 if (!tdb->flags & TDB_NO_NESTING) {
471                         tdb->transaction->nesting++;
472                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
473                                  tdb->transaction->nesting));
474                         return 0;
475                 } else {
476                         tdb_transaction_cancel_internal(tdb);
477                         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: cancelling previous transaction\n"));
478                 }
479         }
480
481         if (tdb->num_locks != 0 || tdb->global_lock.count) {
482                 /* the caller must not have any locks when starting a
483                    transaction as otherwise we'll be screwed by lack
484                    of nested locks in posix */
485                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
486                 tdb->ecode = TDB_ERR_LOCK;
487                 return -1;
488         }
489
490         if (tdb->travlocks.next != NULL) {
491                 /* you cannot use transactions inside a traverse (although you can use
492                    traverse inside a transaction) as otherwise you can end up with
493                    deadlock */
494                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
495                 tdb->ecode = TDB_ERR_LOCK;
496                 return -1;
497         }
498
499         tdb->transaction = (struct tdb_transaction *)
500                 calloc(sizeof(struct tdb_transaction), 1);
501         if (tdb->transaction == NULL) {
502                 tdb->ecode = TDB_ERR_OOM;
503                 return -1;
504         }
505
506         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
507         tdb->transaction->block_size = tdb->page_size;
508
509         /* get the transaction write lock. This is a blocking lock. As
510            discussed with Volker, there are a number of ways we could
511            make this async, which we will probably do in the future */
512         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
513                 SAFE_FREE(tdb->transaction->blocks);
514                 SAFE_FREE(tdb->transaction);
515                 return -1;
516         }
517         
518         /* get a read lock from the freelist to the end of file. This
519            is upgraded to a write lock during the commit */
520         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
521                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
522                 tdb->ecode = TDB_ERR_LOCK;
523                 goto fail;
524         }
525
526         /* setup a copy of the hash table heads so the hash scan in
527            traverse can be fast */
528         tdb->transaction->hash_heads = (uint32_t *)
529                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
530         if (tdb->transaction->hash_heads == NULL) {
531                 tdb->ecode = TDB_ERR_OOM;
532                 goto fail;
533         }
534         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
535                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
536                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
537                 tdb->ecode = TDB_ERR_IO;
538                 goto fail;
539         }
540
541         /* make sure we know about any file expansions already done by
542            anyone else */
543         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
544         tdb->transaction->old_map_size = tdb->map_size;
545
546         /* finally hook the io methods, replacing them with
547            transaction specific methods */
548         tdb->transaction->io_methods = tdb->methods;
549         tdb->methods = &transaction_methods;
550
551         /* Trace at the end, so we get sequence number correct. */
552         tdb_trace(tdb, "tdb_transaction_start");
553         return 0;
554         
555 fail:
556         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
557         tdb_transaction_unlock(tdb);
558         SAFE_FREE(tdb->transaction->blocks);
559         SAFE_FREE(tdb->transaction->hash_heads);
560         SAFE_FREE(tdb->transaction);
561         return -1;
562 }
563
564
565 /*
566   cancel the current transaction
567 */
568 int tdb_transaction_cancel(struct tdb_context *tdb)
569 {       
570         tdb_trace(tdb, "tdb_transaction_cancel");
571         return tdb_transaction_cancel_internal(tdb);
572 }
573 /*
574   sync to disk
575 */
576 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
577 {       
578         if (fsync(tdb->fd) != 0) {
579                 tdb->ecode = TDB_ERR_IO;
580                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
581                 return -1;
582         }
583 #ifdef MS_SYNC
584         if (tdb->map_ptr) {
585                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
586                 if (msync(moffset + (char *)tdb->map_ptr, 
587                           length + (offset - moffset), MS_SYNC) != 0) {
588                         tdb->ecode = TDB_ERR_IO;
589                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
590                                  strerror(errno)));
591                         return -1;
592                 }
593         }
594 #endif
595         return 0;
596 }
597
598
599 /*
600   work out how much space the linearised recovery data will consume
601 */
602 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
603 {
604         tdb_len_t recovery_size = 0;
605         int i;
606
607         recovery_size = sizeof(uint32_t);
608         for (i=0;i<tdb->transaction->num_blocks;i++) {
609                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
610                         break;
611                 }
612                 if (tdb->transaction->blocks[i] == NULL) {
613                         continue;
614                 }
615                 recovery_size += 2*sizeof(tdb_off_t);
616                 if (i == tdb->transaction->num_blocks-1) {
617                         recovery_size += tdb->transaction->last_block_size;
618                 } else {
619                         recovery_size += tdb->transaction->block_size;
620                 }
621         }       
622
623         return recovery_size;
624 }
625
626 /*
627   allocate the recovery area, or use an existing recovery area if it is
628   large enough
629 */
630 static int tdb_recovery_allocate(struct tdb_context *tdb, 
631                                  tdb_len_t *recovery_size,
632                                  tdb_off_t *recovery_offset,
633                                  tdb_len_t *recovery_max_size)
634 {
635         struct list_struct rec;
636         const struct tdb_methods *methods = tdb->transaction->io_methods;
637         tdb_off_t recovery_head;
638
639         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
640                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
641                 return -1;
642         }
643
644         rec.rec_len = 0;
645
646         if (recovery_head != 0 && 
647             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
648                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
649                 return -1;
650         }
651
652         *recovery_size = tdb_recovery_size(tdb);
653
654         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
655                 /* it fits in the existing area */
656                 *recovery_max_size = rec.rec_len;
657                 *recovery_offset = recovery_head;
658                 return 0;
659         }
660
661         /* we need to free up the old recovery area, then allocate a
662            new one at the end of the file. Note that we cannot use
663            tdb_allocate() to allocate the new one as that might return
664            us an area that is being currently used (as of the start of
665            the transaction) */
666         if (recovery_head != 0) {
667                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
668                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
669                         return -1;
670                 }
671         }
672
673         /* the tdb_free() call might have increased the recovery size */
674         *recovery_size = tdb_recovery_size(tdb);
675
676         /* round up to a multiple of page size */
677         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
678         *recovery_offset = tdb->map_size;
679         recovery_head = *recovery_offset;
680
681         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
682                                      (tdb->map_size - tdb->transaction->old_map_size) +
683                                      sizeof(rec) + *recovery_max_size) == -1) {
684                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
685                 return -1;
686         }
687
688         /* remap the file (if using mmap) */
689         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
690
691         /* we have to reset the old map size so that we don't try to expand the file
692            again in the transaction commit, which would destroy the recovery area */
693         tdb->transaction->old_map_size = tdb->map_size;
694
695         /* write the recovery header offset and sync - we can sync without a race here
696            as the magic ptr in the recovery record has not been set */
697         CONVERT(recovery_head);
698         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
699                                &recovery_head, sizeof(tdb_off_t)) == -1) {
700                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
701                 return -1;
702         }
703         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
704                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
705                 return -1;
706         }
707
708         return 0;
709 }
710
711
712 /*
713   setup the recovery data that will be used on a crash during commit
714 */
715 static int transaction_setup_recovery(struct tdb_context *tdb, 
716                                       tdb_off_t *magic_offset)
717 {
718         tdb_len_t recovery_size;
719         unsigned char *data, *p;
720         const struct tdb_methods *methods = tdb->transaction->io_methods;
721         struct list_struct *rec;
722         tdb_off_t recovery_offset, recovery_max_size;
723         tdb_off_t old_map_size = tdb->transaction->old_map_size;
724         uint32_t magic, tailer;
725         int i;
726
727         /*
728           check that the recovery area has enough space
729         */
730         if (tdb_recovery_allocate(tdb, &recovery_size, 
731                                   &recovery_offset, &recovery_max_size) == -1) {
732                 return -1;
733         }
734
735         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
736         if (data == NULL) {
737                 tdb->ecode = TDB_ERR_OOM;
738                 return -1;
739         }
740
741         rec = (struct list_struct *)data;
742         memset(rec, 0, sizeof(*rec));
743
744         rec->magic    = 0;
745         rec->data_len = recovery_size;
746         rec->rec_len  = recovery_max_size;
747         rec->key_len  = old_map_size;
748         CONVERT(rec);
749
750         /* build the recovery data into a single blob to allow us to do a single
751            large write, which should be more efficient */
752         p = data + sizeof(*rec);
753         for (i=0;i<tdb->transaction->num_blocks;i++) {
754                 tdb_off_t offset;
755                 tdb_len_t length;
756
757                 if (tdb->transaction->blocks[i] == NULL) {
758                         continue;
759                 }
760
761                 offset = i * tdb->transaction->block_size;
762                 length = tdb->transaction->block_size;
763                 if (i == tdb->transaction->num_blocks-1) {
764                         length = tdb->transaction->last_block_size;
765                 }
766                 
767                 if (offset >= old_map_size) {
768                         continue;
769                 }
770                 if (offset + length > tdb->transaction->old_map_size) {
771                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
772                         free(data);
773                         tdb->ecode = TDB_ERR_CORRUPT;
774                         return -1;
775                 }
776                 memcpy(p, &offset, 4);
777                 memcpy(p+4, &length, 4);
778                 if (DOCONV()) {
779                         tdb_convert(p, 8);
780                 }
781                 /* the recovery area contains the old data, not the
782                    new data, so we have to call the original tdb_read
783                    method to get it */
784                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
785                         free(data);
786                         tdb->ecode = TDB_ERR_IO;
787                         return -1;
788                 }
789                 p += 8 + length;
790         }
791
792         /* and the tailer */
793         tailer = sizeof(*rec) + recovery_max_size;
794         memcpy(p, &tailer, 4);
795         CONVERT(p);
796
797         /* write the recovery data to the recovery area */
798         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
799                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
800                 free(data);
801                 tdb->ecode = TDB_ERR_IO;
802                 return -1;
803         }
804         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
805                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
806                 free(data);
807                 tdb->ecode = TDB_ERR_IO;
808                 return -1;
809         }
810
811         /* as we don't have ordered writes, we have to sync the recovery
812            data before we update the magic to indicate that the recovery
813            data is present */
814         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
815                 free(data);
816                 return -1;
817         }
818
819         free(data);
820
821         magic = TDB_RECOVERY_MAGIC;
822         CONVERT(magic);
823
824         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
825
826         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
827                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
828                 tdb->ecode = TDB_ERR_IO;
829                 return -1;
830         }
831         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
832                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
833                 tdb->ecode = TDB_ERR_IO;
834                 return -1;
835         }
836
837         /* ensure the recovery magic marker is on disk */
838         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
839                 return -1;
840         }
841
842         return 0;
843 }
844
845 /*
846   commit the current transaction
847 */
848 int tdb_transaction_commit(struct tdb_context *tdb)
849 {       
850         const struct tdb_methods *methods;
851         tdb_off_t magic_offset = 0;
852         uint32_t zero = 0;
853         int i;
854
855         tdb_trace(tdb, "tdb_transaction_commit");
856         if (tdb->transaction == NULL) {
857                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
858                 return -1;
859         }
860
861         if (tdb->transaction->transaction_error) {
862                 tdb->ecode = TDB_ERR_IO;
863                 tdb_transaction_cancel_internal(tdb);
864                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
865                 return -1;
866         }
867
868
869         if (tdb->transaction->nesting != 0) {
870                 tdb->transaction->nesting--;
871                 return 0;
872         }               
873
874         /* check for a null transaction */
875         if (tdb->transaction->blocks == NULL) {
876                 tdb_transaction_cancel_internal(tdb);
877                 return 0;
878         }
879
880         methods = tdb->transaction->io_methods;
881         
882         /* if there are any locks pending then the caller has not
883            nested their locks properly, so fail the transaction */
884         if (tdb->num_locks || tdb->global_lock.count) {
885                 tdb->ecode = TDB_ERR_LOCK;
886                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
887                 tdb_transaction_cancel_internal(tdb);
888                 return -1;
889         }
890
891         /* upgrade the main transaction lock region to a write lock */
892         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
893                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
894                 tdb->ecode = TDB_ERR_LOCK;
895                 tdb_transaction_cancel_internal(tdb);
896                 return -1;
897         }
898
899         /* get the global lock - this prevents new users attaching to the database
900            during the commit */
901         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
902                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
903                 tdb->ecode = TDB_ERR_LOCK;
904                 tdb_transaction_cancel_internal(tdb);
905                 return -1;
906         }
907
908         if (!(tdb->flags & TDB_NOSYNC)) {
909                 /* write the recovery data to the end of the file */
910                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
911                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
912                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
913                         tdb_transaction_cancel_internal(tdb);
914                         return -1;
915                 }
916         }
917
918         /* expand the file to the new size if needed */
919         if (tdb->map_size != tdb->transaction->old_map_size) {
920                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
921                                              tdb->map_size - 
922                                              tdb->transaction->old_map_size) == -1) {
923                         tdb->ecode = TDB_ERR_IO;
924                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
925                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
926                         tdb_transaction_cancel_internal(tdb);
927                         return -1;
928                 }
929                 tdb->map_size = tdb->transaction->old_map_size;
930                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
931         }
932
933         /* perform all the writes */
934         for (i=0;i<tdb->transaction->num_blocks;i++) {
935                 tdb_off_t offset;
936                 tdb_len_t length;
937
938                 if (tdb->transaction->blocks[i] == NULL) {
939                         continue;
940                 }
941
942                 offset = i * tdb->transaction->block_size;
943                 length = tdb->transaction->block_size;
944                 if (i == tdb->transaction->num_blocks-1) {
945                         length = tdb->transaction->last_block_size;
946                 }
947
948                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
949                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
950                         
951                         /* we've overwritten part of the data and
952                            possibly expanded the file, so we need to
953                            run the crash recovery code */
954                         tdb->methods = methods;
955                         tdb_transaction_recover(tdb); 
956
957                         tdb_transaction_cancel_internal(tdb);
958                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
959
960                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
961                         return -1;
962                 }
963                 SAFE_FREE(tdb->transaction->blocks[i]);
964         } 
965
966         SAFE_FREE(tdb->transaction->blocks);
967         tdb->transaction->num_blocks = 0;
968
969         if (!(tdb->flags & TDB_NOSYNC)) {
970                 /* ensure the new data is on disk */
971                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
972                         return -1;
973                 }
974
975                 /* remove the recovery marker */
976                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
977                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
978                         return -1;
979                 }
980
981                 /* ensure the recovery marker has been removed on disk */
982                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
983                         return -1;
984                 }
985         }
986
987         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
988
989         /*
990           TODO: maybe write to some dummy hdr field, or write to magic
991           offset without mmap, before the last sync, instead of the
992           utime() call
993         */
994
995         /* on some systems (like Linux 2.6.x) changes via mmap/msync
996            don't change the mtime of the file, this means the file may
997            not be backed up (as tdb rounding to block sizes means that
998            file size changes are quite rare too). The following forces
999            mtime changes when a transaction completes */
1000 #if HAVE_UTIME
1001         utime(tdb->name, NULL);
1002 #endif
1003
1004         /* use a transaction cancel to free memory and remove the
1005            transaction locks */
1006         tdb_transaction_cancel_internal(tdb);
1007
1008         return 0;
1009 }
1010
1011
1012 /*
1013   recover from an aborted transaction. Must be called with exclusive
1014   database write access already established (including the global
1015   lock to prevent new processes attaching)
1016 */
1017 int tdb_transaction_recover(struct tdb_context *tdb)
1018 {
1019         tdb_off_t recovery_head, recovery_eof;
1020         unsigned char *data, *p;
1021         uint32_t zero = 0;
1022         struct list_struct rec;
1023
1024         /* find the recovery area */
1025         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1026                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1027                 tdb->ecode = TDB_ERR_IO;
1028                 return -1;
1029         }
1030
1031         if (recovery_head == 0) {
1032                 /* we have never allocated a recovery record */
1033                 return 0;
1034         }
1035
1036         /* read the recovery record */
1037         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1038                                    sizeof(rec), DOCONV()) == -1) {
1039                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1040                 tdb->ecode = TDB_ERR_IO;
1041                 return -1;
1042         }
1043
1044         if (rec.magic != TDB_RECOVERY_MAGIC) {
1045                 /* there is no valid recovery data */
1046                 return 0;
1047         }
1048
1049         if (tdb->read_only) {
1050                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1051                 tdb->ecode = TDB_ERR_CORRUPT;
1052                 return -1;
1053         }
1054
1055         recovery_eof = rec.key_len;
1056
1057         data = (unsigned char *)malloc(rec.data_len);
1058         if (data == NULL) {
1059                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1060                 tdb->ecode = TDB_ERR_OOM;
1061                 return -1;
1062         }
1063
1064         /* read the full recovery data */
1065         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1066                                    rec.data_len, 0) == -1) {
1067                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1068                 tdb->ecode = TDB_ERR_IO;
1069                 return -1;
1070         }
1071
1072         /* recover the file data */
1073         p = data;
1074         while (p+8 < data + rec.data_len) {
1075                 uint32_t ofs, len;
1076                 if (DOCONV()) {
1077                         tdb_convert(p, 8);
1078                 }
1079                 memcpy(&ofs, p, 4);
1080                 memcpy(&len, p+4, 4);
1081
1082                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1083                         free(data);
1084                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1085                         tdb->ecode = TDB_ERR_IO;
1086                         return -1;
1087                 }
1088                 p += 8 + len;
1089         }
1090
1091         free(data);
1092
1093         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1094                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1095                 tdb->ecode = TDB_ERR_IO;
1096                 return -1;
1097         }
1098
1099         /* if the recovery area is after the recovered eof then remove it */
1100         if (recovery_eof <= recovery_head) {
1101                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1102                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1103                         tdb->ecode = TDB_ERR_IO;
1104                         return -1;                      
1105                 }
1106         }
1107
1108         /* remove the recovery magic */
1109         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1110                           &zero) == -1) {
1111                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1112                 tdb->ecode = TDB_ERR_IO;
1113                 return -1;                      
1114         }
1115         
1116         /* reduce the file size to the old size */
1117         tdb_munmap(tdb);
1118         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1119                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1120                 tdb->ecode = TDB_ERR_IO;
1121                 return -1;                      
1122         }
1123         tdb->map_size = recovery_eof;
1124         tdb_mmap(tdb);
1125
1126         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1127                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1128                 tdb->ecode = TDB_ERR_IO;
1129                 return -1;
1130         }
1131
1132         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1133                  recovery_eof));
1134
1135         /* all done */
1136         return 0;
1137 }