X-Git-Url: http://git.ozlabs.org/?p=ccan;a=blobdiff_plain;f=ccan%2Ftdb2%2Ftransaction.c;h=55b7fd60dc9c38af5298dc0e87817ca1fa3bf952;hp=2cab4655661768a80e44593211f152dd3c73175c;hb=18fe5ef012a96014b9e61e48616e682b4a5708a2;hpb=c8c9a4693b9fbe2a6b2b6dfced49958db5c7fdd4 diff --git a/ccan/tdb2/transaction.c b/ccan/tdb2/transaction.c index 2cab4655..55b7fd60 100644 --- a/ccan/tdb2/transaction.c +++ b/ccan/tdb2/transaction.c @@ -88,7 +88,6 @@ fsync/msync calls are made. */ - /* hold the context of any current transaction */ @@ -110,7 +109,7 @@ struct tdb_transaction { /* when inside a transaction we need to keep track of any nested tdb_transaction_start() calls, as these are allowed, but don't create a new transaction */ - int nesting; + unsigned int nesting; /* set when a prepare has already occurred */ bool prepared; @@ -120,6 +119,8 @@ struct tdb_transaction { tdb_len_t old_map_size; }; +/* This doesn't really need to be pagesize, but we use it for similar reasons. */ +#define PAGESIZE 65536 /* read while in a transaction. We need to check first if the data is in our list @@ -132,8 +133,8 @@ static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off, enum TDB_ERROR ecode; /* break it down into block sized ops */ - while (len + (off % getpagesize()) > getpagesize()) { - tdb_len_t len2 = getpagesize() - (off % getpagesize()); + while (len + (off % PAGESIZE) > PAGESIZE) { + tdb_len_t len2 = PAGESIZE - (off % PAGESIZE); ecode = transaction_read(tdb, off, buf, len2); if (ecode != TDB_SUCCESS) { return ecode; @@ -147,7 +148,7 @@ static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off, return TDB_SUCCESS; } - blk = off / getpagesize(); + blk = off / PAGESIZE; /* see if we have it in the block list */ if (tdb->transaction->num_blocks <= blk || @@ -169,7 +170,7 @@ static enum TDB_ERROR transaction_read(struct tdb_context *tdb, tdb_off_t off, } /* now copy it out of this block */ - memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len); + memcpy(buf, tdb->transaction->blocks[blk] + (off % PAGESIZE), len); return TDB_SUCCESS; fail: @@ -198,8 +199,8 @@ static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off, } /* break it up into block sized chunks */ - while (len + (off % getpagesize()) > getpagesize()) { - tdb_len_t len2 = getpagesize() - (off % getpagesize()); + while (len + (off % PAGESIZE) > PAGESIZE) { + tdb_len_t len2 = PAGESIZE - (off % PAGESIZE); ecode = transaction_write(tdb, off, buf, len2); if (ecode != TDB_SUCCESS) { return -1; @@ -215,8 +216,8 @@ static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off, return TDB_SUCCESS; } - blk = off / getpagesize(); - off = off % getpagesize(); + blk = off / PAGESIZE; + off = off % PAGESIZE; if (tdb->transaction->num_blocks <= blk) { uint8_t **new_blocks; @@ -244,20 +245,20 @@ static enum TDB_ERROR transaction_write(struct tdb_context *tdb, tdb_off_t off, /* allocate and fill a block? */ if (tdb->transaction->blocks[blk] == NULL) { - tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1); + tdb->transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1); if (tdb->transaction->blocks[blk] == NULL) { ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR, "transaction_write:" " failed to allocate"); goto fail; } - if (tdb->transaction->old_map_size > blk * getpagesize()) { - tdb_len_t len2 = getpagesize(); - if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) { - len2 = tdb->transaction->old_map_size - (blk * getpagesize()); + if (tdb->transaction->old_map_size > blk * PAGESIZE) { + tdb_len_t len2 = PAGESIZE; + if (len2 + (blk * PAGESIZE) > tdb->transaction->old_map_size) { + len2 = tdb->transaction->old_map_size - (blk * PAGESIZE); } ecode = tdb->transaction->io_methods->tread(tdb, - blk * getpagesize(), + blk * PAGESIZE, tdb->transaction->blocks[blk], len2); if (ecode != TDB_SUCCESS) { @@ -306,8 +307,8 @@ static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, size_t blk; /* break it up into block sized chunks */ - while (len + (off % getpagesize()) > getpagesize()) { - tdb_len_t len2 = getpagesize() - (off % getpagesize()); + while (len + (off % PAGESIZE) > PAGESIZE) { + tdb_len_t len2 = PAGESIZE - (off % PAGESIZE); transaction_write_existing(tdb, off, buf, len2); len -= len2; off += len2; @@ -320,8 +321,8 @@ static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, return; } - blk = off / getpagesize(); - off = off % getpagesize(); + blk = off / PAGESIZE; + off = off % PAGESIZE; if (tdb->transaction->num_blocks <= blk || tdb->transaction->blocks[blk] == NULL) { @@ -347,14 +348,14 @@ static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len, bool probe) { - if (len <= tdb->map_size) { + if (len <= tdb->file->map_size) { return TDB_SUCCESS; } if (!probe) { tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR, "tdb_oob len %lld beyond transaction size %lld", (long long)len, - (long long)tdb->map_size); + (long long)tdb->file->map_size); } return TDB_ERR_IO; } @@ -369,9 +370,9 @@ static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb, /* add a write to the transaction elements, so subsequent reads see the zero data */ - ecode = transaction_write(tdb, tdb->map_size, NULL, addition); + ecode = transaction_write(tdb, tdb->file->map_size, NULL, addition); if (ecode == TDB_SUCCESS) { - tdb->map_size += addition; + tdb->file->map_size += addition; } return ecode; } @@ -379,10 +380,10 @@ static enum TDB_ERROR transaction_expand_file(struct tdb_context *tdb, static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off, size_t len, bool write_mode) { - size_t blk = off / getpagesize(), end_blk; + size_t blk = off / PAGESIZE, end_blk; /* This is wrong for zero-length blocks, but will fail gracefully */ - end_blk = (off + len - 1) / getpagesize(); + end_blk = (off + len - 1) / PAGESIZE; /* Can only do direct if in single block and we've already copied. */ if (write_mode) { @@ -392,17 +393,17 @@ static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off, return NULL; if (tdb->transaction->blocks[blk] == NULL) return NULL; - return tdb->transaction->blocks[blk] + off % getpagesize(); + return tdb->transaction->blocks[blk] + off % PAGESIZE; } /* Single which we have copied? */ if (blk == end_blk && blk < tdb->transaction->num_blocks && tdb->transaction->blocks[blk]) - return tdb->transaction->blocks[blk] + off % getpagesize(); + return tdb->transaction->blocks[blk] + off % PAGESIZE; /* Otherwise must be all not copied. */ - while (blk < end_blk) { + while (blk <= end_blk) { if (blk >= tdb->transaction->num_blocks) break; if (tdb->transaction->blocks[blk]) @@ -430,15 +431,15 @@ static enum TDB_ERROR transaction_sync(struct tdb_context *tdb, return TDB_SUCCESS; } - if (fsync(tdb->fd) != 0) { + if (fsync(tdb->file->fd) != 0) { return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR, "tdb_transaction: fsync failed: %s", strerror(errno)); } #ifdef MS_SYNC - if (tdb->map_ptr) { + if (tdb->file->map_ptr) { tdb_off_t moffset = offset & ~(getpagesize()-1); - if (msync(moffset + (char *)tdb->map_ptr, + if (msync(moffset + (char *)tdb->file->map_ptr, length + (offset - moffset), MS_SYNC) != 0) { return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR, "tdb_transaction: msync failed: %s", @@ -467,7 +468,7 @@ static void _tdb_transaction_cancel(struct tdb_context *tdb) return; } - tdb->map_size = tdb->transaction->old_map_size; + tdb->file->map_size = tdb->transaction->old_map_size; /* free all the transaction blocks */ for (i=0;itransaction->num_blocks;i++) { @@ -495,8 +496,8 @@ static void _tdb_transaction_cancel(struct tdb_context *tdb) } } - if (tdb->allrecord_lock.count) - tdb_allrecord_unlock(tdb, tdb->allrecord_lock.ltype); + if (tdb->file->allrecord_lock.count) + tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype); /* restore the normal io methods */ tdb->methods = tdb->transaction->io_methods; @@ -519,32 +520,46 @@ enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb) /* some sanity checks */ if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) { - return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR, - "tdb_transaction_start: cannot start a" - " transaction on a read-only or internal db"); + return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, + TDB_LOG_USE_ERROR, + "tdb_transaction_start:" + " cannot start a" + " transaction on a " + "read-only or internal db"); } /* cope with nested tdb_transaction_start() calls */ if (tdb->transaction != NULL) { - return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_USE_ERROR, - "tdb_transaction_start:" - " already inside transaction"); + if (!(tdb->flags & TDB_ALLOW_NESTING)) { + return tdb->last_error + = tdb_logerr(tdb, TDB_ERR_IO, + TDB_LOG_USE_ERROR, + "tdb_transaction_start:" + " already inside transaction"); + } + tdb->transaction->nesting++; + return 0; } if (tdb_has_hash_locks(tdb)) { /* the caller must not have any locks when starting a transaction as otherwise we'll be screwed by lack of nested locks in POSIX */ - return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR, - "tdb_transaction_start: cannot start a" - " transaction with locks held"); + return tdb->last_error = tdb_logerr(tdb, TDB_ERR_LOCK, + TDB_LOG_USE_ERROR, + "tdb_transaction_start:" + " cannot start a" + " transaction with locks" + " held"); } tdb->transaction = (struct tdb_transaction *) calloc(sizeof(struct tdb_transaction), 1); if (tdb->transaction == NULL) { - return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR, - "tdb_transaction_start: cannot allocate"); + return tdb->last_error = tdb_logerr(tdb, TDB_ERR_OOM, + TDB_LOG_ERROR, + "tdb_transaction_start:" + " cannot allocate"); } /* get the transaction write lock. This is a blocking lock. As @@ -554,7 +569,7 @@ enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb) if (ecode != TDB_SUCCESS) { SAFE_FREE(tdb->transaction->blocks); SAFE_FREE(tdb->transaction); - return ecode; + return tdb->last_error = ecode; } /* get a read lock over entire file. This is upgraded to a write @@ -566,20 +581,20 @@ enum TDB_ERROR tdb_transaction_start(struct tdb_context *tdb) /* make sure we know about any file expansions already done by anyone else */ - tdb->methods->oob(tdb, tdb->map_size + 1, true); - tdb->transaction->old_map_size = tdb->map_size; + tdb->methods->oob(tdb, tdb->file->map_size + 1, true); + tdb->transaction->old_map_size = tdb->file->map_size; /* finally hook the io methods, replacing them with transaction specific methods */ tdb->transaction->io_methods = tdb->methods; tdb->methods = &transaction_methods; - return TDB_SUCCESS; + return tdb->last_error = TDB_SUCCESS; fail_allrecord_lock: tdb_transaction_unlock(tdb, F_WRLCK); SAFE_FREE(tdb->transaction->blocks); SAFE_FREE(tdb->transaction); - return ecode; + return tdb->last_error = ecode; } @@ -592,16 +607,16 @@ void tdb_transaction_cancel(struct tdb_context *tdb) } /* - work out how much space the linearised recovery data will consume + work out how much space the linearised recovery data will consume (worst case) */ static tdb_len_t tdb_recovery_size(struct tdb_context *tdb) { tdb_len_t recovery_size = 0; int i; - recovery_size = sizeof(tdb_len_t); + recovery_size = 0; for (i=0;itransaction->num_blocks;i++) { - if (i * getpagesize() >= tdb->transaction->old_map_size) { + if (i * PAGESIZE >= tdb->transaction->old_map_size) { break; } if (tdb->transaction->blocks[i] == NULL) { @@ -611,94 +626,204 @@ static tdb_len_t tdb_recovery_size(struct tdb_context *tdb) if (i == tdb->transaction->num_blocks-1) { recovery_size += tdb->transaction->last_block_size; } else { - recovery_size += getpagesize(); + recovery_size += PAGESIZE; } } return recovery_size; } -/* - allocate the recovery area, or use an existing recovery area if it is - large enough -*/ -static enum TDB_ERROR tdb_recovery_allocate(struct tdb_context *tdb, - tdb_len_t *recovery_size, - tdb_off_t *recovery_offset, - tdb_len_t *recovery_max_size) +static enum TDB_ERROR tdb_recovery_area(struct tdb_context *tdb, + const struct tdb_methods *methods, + tdb_off_t *recovery_offset, + struct tdb_recovery_record *rec) { - struct tdb_recovery_record rec; - const struct tdb_methods *methods = tdb->transaction->io_methods; - tdb_off_t recovery_head; - size_t addition; enum TDB_ERROR ecode; - recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery)); - if (TDB_OFF_IS_ERR(recovery_head)) { - return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR, - "tdb_recovery_allocate:" - " failed to read recovery head"); + *recovery_offset = tdb_read_off(tdb, + offsetof(struct tdb_header, recovery)); + if (TDB_OFF_IS_ERR(*recovery_offset)) { + return *recovery_offset; } - if (recovery_head != 0) { - ecode = methods->tread(tdb, recovery_head, &rec, sizeof(rec)); - if (ecode != TDB_SUCCESS) { - return tdb_logerr(tdb, ecode, TDB_LOG_ERROR, - "tdb_recovery_allocate:" - " failed to read recovery record"); - } - tdb_convert(tdb, &rec, sizeof(rec)); - /* ignore invalid recovery regions: can happen in crash */ - if (rec.magic != TDB_RECOVERY_MAGIC && - rec.magic != TDB_RECOVERY_INVALID_MAGIC) { - recovery_head = 0; + if (*recovery_offset == 0) { + rec->max_len = 0; + return TDB_SUCCESS; + } + + ecode = methods->tread(tdb, *recovery_offset, rec, sizeof(*rec)); + if (ecode != TDB_SUCCESS) + return ecode; + + tdb_convert(tdb, rec, sizeof(*rec)); + /* ignore invalid recovery regions: can happen in crash */ + if (rec->magic != TDB_RECOVERY_MAGIC && + rec->magic != TDB_RECOVERY_INVALID_MAGIC) { + *recovery_offset = 0; + rec->max_len = 0; + } + return TDB_SUCCESS; +} + +static unsigned int same(const unsigned char *new, + const unsigned char *old, + unsigned int length) +{ + unsigned int i; + + for (i = 0; i < length; i++) { + if (new[i] != old[i]) + break; + } + return i; +} + +static unsigned int different(const unsigned char *new, + const unsigned char *old, + unsigned int length, + unsigned int min_same, + unsigned int *samelen) +{ + unsigned int i; + + *samelen = 0; + for (i = 0; i < length; i++) { + if (new[i] == old[i]) { + (*samelen)++; + } else { + if (*samelen >= min_same) { + return i - *samelen; + } + *samelen = 0; } } - *recovery_size = tdb_recovery_size(tdb); + if (*samelen < min_same) + *samelen = 0; + return length - *samelen; +} - if (recovery_head != 0 && *recovery_size <= rec.max_len) { - /* it fits in the existing area */ - *recovery_max_size = rec.max_len; - *recovery_offset = recovery_head; - return TDB_SUCCESS; +/* Allocates recovery blob, without tdb_recovery_record at head set up. */ +static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb, + tdb_len_t *len) +{ + struct tdb_recovery_record *rec; + size_t i; + enum TDB_ERROR ecode; + unsigned char *p; + const struct tdb_methods *old_methods = tdb->methods; + + rec = malloc(sizeof(*rec) + tdb_recovery_size(tdb)); + if (!rec) { + tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR, + "transaction_setup_recovery:" + " cannot allocate"); + return TDB_ERR_PTR(TDB_ERR_OOM); } - /* we need to free up the old recovery area, then allocate a - new one at the end of the file. Note that we cannot use - normal allocation to allocate the new one as that might return - us an area that is being currently used (as of the start of - the transaction) */ - if (recovery_head != 0) { - add_stat(tdb, frees, 1); - ecode = add_free_record(tdb, recovery_head, - sizeof(rec) + rec.max_len); - if (ecode != TDB_SUCCESS) { - return tdb_logerr(tdb, ecode, TDB_LOG_ERROR, - "tdb_recovery_allocate:" - " failed to free previous" - " recovery area"); + /* We temporarily revert to the old I/O methods, so we can use + * tdb_access_read */ + tdb->methods = tdb->transaction->io_methods; + + /* build the recovery data into a single blob to allow us to do a single + large write, which should be more efficient */ + p = (unsigned char *)(rec + 1); + for (i=0;itransaction->num_blocks;i++) { + tdb_off_t offset; + tdb_len_t length; + unsigned int off; + const unsigned char *buffer; + + if (tdb->transaction->blocks[i] == NULL) { + continue; + } + + offset = i * PAGESIZE; + length = PAGESIZE; + if (i == tdb->transaction->num_blocks-1) { + length = tdb->transaction->last_block_size; + } + + if (offset >= tdb->transaction->old_map_size) { + continue; + } + + if (offset + length > tdb->file->map_size) { + ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR, + "tdb_transaction_setup_recovery:" + " transaction data over new region" + " boundary"); + goto fail; } + if (offset + length > tdb->transaction->old_map_size) { + /* Short read at EOF. */ + length = tdb->transaction->old_map_size - offset; + } + buffer = tdb_access_read(tdb, offset, length, false); + if (TDB_PTR_IS_ERR(buffer)) { + ecode = TDB_PTR_ERR(buffer); + goto fail; + } + + /* Skip over anything the same at the start. */ + off = same(tdb->transaction->blocks[i], buffer, length); + offset += off; + + while (off < length) { + tdb_len_t len; + unsigned int samelen; + + len = different(tdb->transaction->blocks[i] + off, + buffer + off, length - off, + sizeof(offset) + sizeof(len) + 1, + &samelen); + + memcpy(p, &offset, sizeof(offset)); + memcpy(p + sizeof(offset), &len, sizeof(len)); + tdb_convert(tdb, p, sizeof(offset) + sizeof(len)); + p += sizeof(offset) + sizeof(len); + memcpy(p, buffer + off, len); + p += len; + off += len + samelen; + offset += len + samelen; + } + tdb_access_release(tdb, buffer); } - /* the tdb_free() call might have increased the recovery size */ - *recovery_size = tdb_recovery_size(tdb); + *len = p - (unsigned char *)(rec + 1); + tdb->methods = old_methods; + return rec; - /* round up to a multiple of page size */ - *recovery_max_size - = (((sizeof(rec) + *recovery_size) + getpagesize()-1) - & ~(getpagesize()-1)) - - sizeof(rec); - *recovery_offset = tdb->map_size; - recovery_head = *recovery_offset; +fail: + free(rec); + tdb->methods = old_methods; + return TDB_ERR_PTR(ecode); +} + +static tdb_off_t create_recovery_area(struct tdb_context *tdb, + tdb_len_t rec_length, + struct tdb_recovery_record *rec) +{ + tdb_off_t off, recovery_off; + tdb_len_t addition; + enum TDB_ERROR ecode; + const struct tdb_methods *methods = tdb->transaction->io_methods; + + /* round up to a multiple of page size. Overallocate, since each + * such allocation forces us to expand the file. */ + rec->max_len + = (((sizeof(*rec) + rec_length + rec_length / 2) + + PAGESIZE-1) & ~(PAGESIZE-1)) + - sizeof(*rec); + off = tdb->file->map_size; /* Restore ->map_size before calling underlying expand_file. Also so that we don't try to expand the file again in the transaction commit, which would destroy the recovery area */ - addition = (tdb->map_size - tdb->transaction->old_map_size) + - sizeof(rec) + *recovery_max_size; - tdb->map_size = tdb->transaction->old_map_size; + addition = (tdb->file->map_size - tdb->transaction->old_map_size) + + sizeof(*rec) + rec->max_len; + tdb->file->map_size = tdb->transaction->old_map_size; ecode = methods->expand_file(tdb, addition); if (ecode != TDB_SUCCESS) { return tdb_logerr(tdb, ecode, TDB_LOG_ERROR, @@ -709,163 +834,122 @@ static enum TDB_ERROR tdb_recovery_allocate(struct tdb_context *tdb, /* we have to reset the old map size so that we don't try to expand the file again in the transaction commit, which would destroy the recovery area */ - tdb->transaction->old_map_size = tdb->map_size; + tdb->transaction->old_map_size = tdb->file->map_size; /* write the recovery header offset and sync - we can sync without a race here as the magic ptr in the recovery record has not been set */ - tdb_convert(tdb, &recovery_head, sizeof(recovery_head)); + recovery_off = off; + tdb_convert(tdb, &recovery_off, sizeof(recovery_off)); ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery), - &recovery_head, sizeof(tdb_off_t)); + &recovery_off, sizeof(tdb_off_t)); if (ecode != TDB_SUCCESS) { return tdb_logerr(tdb, ecode, TDB_LOG_ERROR, "tdb_recovery_allocate:" " failed to write recovery head"); } transaction_write_existing(tdb, offsetof(struct tdb_header, recovery), - &recovery_head, + &recovery_off, sizeof(tdb_off_t)); - return TDB_SUCCESS; -} - -/* Set up header for the recovery record. */ -static void set_recovery_header(struct tdb_recovery_record *rec, - uint64_t magic, - uint64_t datalen, uint64_t actuallen, - uint64_t oldsize) -{ - rec->magic = magic; - rec->max_len = actuallen; - rec->len = datalen; - rec->eof = oldsize; + return off; } /* setup the recovery data that will be used on a crash during commit */ -static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb, - tdb_off_t *magic_offset) +static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb) { - /* Initialized for GCC's 4.4.5 overzealous uninitialized warnings. */ tdb_len_t recovery_size = 0; - tdb_off_t recovery_offset = 0, recovery_max_size = 0; - unsigned char *data, *p; - const struct tdb_methods *methods = tdb->transaction->io_methods; - struct tdb_recovery_record *rec; + tdb_off_t recovery_off = 0; tdb_off_t old_map_size = tdb->transaction->old_map_size; - uint64_t magic, tailer; - int i; + struct tdb_recovery_record *recovery; + const struct tdb_methods *methods = tdb->transaction->io_methods; + uint64_t magic; enum TDB_ERROR ecode; - /* - check that the recovery area has enough space - */ - ecode = tdb_recovery_allocate(tdb, &recovery_size, - &recovery_offset, &recovery_max_size); - if (ecode != TDB_SUCCESS) { - return ecode; - } + recovery = alloc_recovery(tdb, &recovery_size); + if (TDB_PTR_IS_ERR(recovery)) + return TDB_PTR_ERR(recovery); - data = (unsigned char *)malloc(recovery_size + sizeof(*rec)); - if (data == NULL) { - return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR, - "transaction_setup_recovery:" - " cannot allocate"); + ecode = tdb_recovery_area(tdb, methods, &recovery_off, recovery); + if (ecode) { + free(recovery); + return ecode; } - rec = (struct tdb_recovery_record *)data; - set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC, - recovery_size, recovery_max_size, old_map_size); - tdb_convert(tdb, rec, sizeof(*rec)); - - /* build the recovery data into a single blob to allow us to do a single - large write, which should be more efficient */ - p = data + sizeof(*rec); - for (i=0;itransaction->num_blocks;i++) { - tdb_off_t offset; - tdb_len_t length; - - if (tdb->transaction->blocks[i] == NULL) { - continue; - } + if (recovery->max_len < recovery_size) { + /* Not large enough. Free up old recovery area. */ + if (recovery_off) { + tdb->stats.frees++; + ecode = add_free_record(tdb, recovery_off, + sizeof(*recovery) + + recovery->max_len, + TDB_LOCK_WAIT, true); + free(recovery); + if (ecode != TDB_SUCCESS) { + return tdb_logerr(tdb, ecode, TDB_LOG_ERROR, + "tdb_recovery_allocate:" + " failed to free previous" + " recovery area"); + } - offset = i * getpagesize(); - length = getpagesize(); - if (i == tdb->transaction->num_blocks-1) { - length = tdb->transaction->last_block_size; + /* Refresh recovery after add_free_record above. */ + recovery = alloc_recovery(tdb, &recovery_size); + if (TDB_PTR_IS_ERR(recovery)) + return TDB_PTR_ERR(recovery); } - if (offset >= old_map_size) { - continue; - } - if (offset + length > tdb->map_size) { - free(data); - return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR, - "tdb_transaction_setup_recovery:" - " transaction data over new region" - " boundary"); - } - memcpy(p, &offset, sizeof(offset)); - memcpy(p + sizeof(offset), &length, sizeof(length)); - tdb_convert(tdb, p, sizeof(offset) + sizeof(length)); - - /* the recovery area contains the old data, not the - new data, so we have to call the original tdb_read - method to get it */ - ecode = methods->tread(tdb, offset, - p + sizeof(offset) + sizeof(length), - length); - if (ecode != TDB_SUCCESS) { - free(data); - return ecode; + recovery_off = create_recovery_area(tdb, recovery_size, + recovery); + if (TDB_OFF_IS_ERR(recovery_off)) { + free(recovery); + return recovery_off; } - p += sizeof(offset) + sizeof(length) + length; } - /* and the tailer */ - tailer = sizeof(*rec) + recovery_max_size; - memcpy(p, &tailer, sizeof(tailer)); - tdb_convert(tdb, p, sizeof(tailer)); + /* Now we know size, convert rec header. */ + recovery->magic = TDB_RECOVERY_INVALID_MAGIC; + recovery->len = recovery_size; + recovery->eof = old_map_size; + tdb_convert(tdb, recovery, sizeof(*recovery)); /* write the recovery data to the recovery area */ - ecode = methods->twrite(tdb, recovery_offset, data, - sizeof(*rec) + recovery_size); + ecode = methods->twrite(tdb, recovery_off, recovery, recovery_size); if (ecode != TDB_SUCCESS) { - free(data); + free(recovery); return tdb_logerr(tdb, ecode, TDB_LOG_ERROR, "tdb_transaction_setup_recovery:" " failed to write recovery data"); } - transaction_write_existing(tdb, recovery_offset, data, - sizeof(*rec) + recovery_size); + transaction_write_existing(tdb, recovery_off, recovery, recovery_size); + + free(recovery); /* as we don't have ordered writes, we have to sync the recovery data before we update the magic to indicate that the recovery data is present */ - ecode = transaction_sync(tdb, recovery_offset, - sizeof(*rec) + recovery_size); - if (ecode != TDB_SUCCESS) { - free(data); + ecode = transaction_sync(tdb, recovery_off, recovery_size); + if (ecode != TDB_SUCCESS) return ecode; - } - - free(data); magic = TDB_RECOVERY_MAGIC; tdb_convert(tdb, &magic, sizeof(magic)); - *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record, - magic); + tdb->transaction->magic_offset + = recovery_off + offsetof(struct tdb_recovery_record, magic); - ecode = methods->twrite(tdb, *magic_offset, &magic, sizeof(magic)); + ecode = methods->twrite(tdb, tdb->transaction->magic_offset, + &magic, sizeof(magic)); if (ecode != TDB_SUCCESS) { return tdb_logerr(tdb, ecode, TDB_LOG_ERROR, "tdb_transaction_setup_recovery:" " failed to write recovery magic"); } - transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)); + transaction_write_existing(tdb, tdb->transaction->magic_offset, + &magic, sizeof(magic)); /* ensure the recovery magic marker is on disk */ - return transaction_sync(tdb, *magic_offset, sizeof(magic)); + return transaction_sync(tdb, tdb->transaction->magic_offset, + sizeof(magic)); } static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb) @@ -895,7 +979,6 @@ static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb) if (tdb->transaction->nesting != 0) { - tdb->transaction->nesting--; return TDB_SUCCESS; } @@ -909,10 +992,6 @@ static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb) /* upgrade the main transaction lock region to a write lock */ ecode = tdb_allrecord_upgrade(tdb); if (ecode != TDB_SUCCESS) { - tdb_logerr(tdb, ecode, TDB_LOG_ERROR, - "tdb_transaction_prepare_commit:" - " failed to upgrade hash locks"); - _tdb_transaction_cancel(tdb); return ecode; } @@ -920,24 +999,15 @@ static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb) during the commit */ ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK); if (ecode != TDB_SUCCESS) { - tdb_logerr(tdb, ecode, TDB_LOG_ERROR, - "tdb_transaction_prepare_commit:" - " failed to get open lock"); - _tdb_transaction_cancel(tdb); return ecode; } /* Since we have whole db locked, we don't need the expansion lock. */ if (!(tdb->flags & TDB_NOSYNC)) { - /* write the recovery data to the end of the file */ - ecode = transaction_setup_recovery(tdb, - &tdb->transaction - ->magic_offset); + /* Sets up tdb->transaction->recovery and + * tdb->transaction->magic_offset. */ + ecode = transaction_setup_recovery(tdb); if (ecode != TDB_SUCCESS) { - tdb_logerr(tdb, ecode, TDB_LOG_ERROR, - "tdb_transaction_prepare_commit:" - " failed to setup recovery data"); - _tdb_transaction_cancel(tdb); return ecode; } } @@ -945,16 +1015,14 @@ static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb) tdb->transaction->prepared = true; /* expand the file to the new size if needed */ - if (tdb->map_size != tdb->transaction->old_map_size) { - tdb_len_t add = tdb->map_size - tdb->transaction->old_map_size; + if (tdb->file->map_size != tdb->transaction->old_map_size) { + tdb_len_t add; + + add = tdb->file->map_size - tdb->transaction->old_map_size; /* Restore original map size for tdb_expand_file */ - tdb->map_size = tdb->transaction->old_map_size; + tdb->file->map_size = tdb->transaction->old_map_size; ecode = methods->expand_file(tdb, add); if (ecode != TDB_SUCCESS) { - tdb_logerr(tdb, ecode, TDB_LOG_ERROR, - "tdb_transaction_prepare_commit:" - " expansion failed"); - _tdb_transaction_cancel(tdb); return ecode; } } @@ -981,27 +1049,31 @@ enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb) enum TDB_ERROR ecode; if (tdb->transaction == NULL) { - return tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR, - "tdb_transaction_commit: no transaction"); + return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL, + TDB_LOG_USE_ERROR, + "tdb_transaction_commit:" + " no transaction"); } tdb_trace(tdb, "tdb_transaction_commit"); if (tdb->transaction->nesting != 0) { tdb->transaction->nesting--; - return TDB_SUCCESS; + return tdb->last_error = TDB_SUCCESS; } /* check for a null transaction */ if (tdb->transaction->blocks == NULL) { _tdb_transaction_cancel(tdb); - return TDB_SUCCESS; + return tdb->last_error = TDB_SUCCESS; } if (!tdb->transaction->prepared) { ecode = _tdb_transaction_prepare_commit(tdb); - if (ecode != TDB_SUCCESS) - return ecode; + if (ecode != TDB_SUCCESS) { + _tdb_transaction_cancel(tdb); + return tdb->last_error = ecode; + } } methods = tdb->transaction->io_methods; @@ -1015,8 +1087,8 @@ enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb) continue; } - offset = i * getpagesize(); - length = getpagesize(); + offset = i * PAGESIZE; + length = PAGESIZE; if (i == tdb->transaction->num_blocks-1) { length = tdb->transaction->last_block_size; } @@ -1024,10 +1096,6 @@ enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb) ecode = methods->twrite(tdb, offset, tdb->transaction->blocks[i], length); if (ecode != TDB_SUCCESS) { - tdb_logerr(tdb, ecode, TDB_LOG_ERROR, - "tdb_transaction_commit:" - " write failed during commit"); - /* we've overwritten part of the data and possibly expanded the file, so we need to run the crash recovery code */ @@ -1036,7 +1104,7 @@ enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb) _tdb_transaction_cancel(tdb); - return ecode; + return tdb->last_error = ecode; } SAFE_FREE(tdb->transaction->blocks[i]); } @@ -1045,9 +1113,9 @@ enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb) tdb->transaction->num_blocks = 0; /* ensure the new data is on disk */ - ecode = transaction_sync(tdb, 0, tdb->map_size); + ecode = transaction_sync(tdb, 0, tdb->file->map_size); if (ecode != TDB_SUCCESS) { - return ecode; + return tdb->last_error = ecode; } /* @@ -1066,10 +1134,11 @@ enum TDB_ERROR tdb_transaction_commit(struct tdb_context *tdb) #endif /* use a transaction cancel to free memory and remove the - transaction locks */ + transaction locks: it "restores" map_size, too. */ + tdb->transaction->old_map_size = tdb->file->map_size; _tdb_transaction_cancel(tdb); - return TDB_SUCCESS; + return tdb->last_error = TDB_SUCCESS; } @@ -1159,7 +1228,7 @@ enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb) free(data); - ecode = transaction_sync(tdb, 0, tdb->map_size); + ecode = transaction_sync(tdb, 0, tdb->file->map_size); if (ecode != TDB_SUCCESS) { return tdb_logerr(tdb, ecode, TDB_LOG_ERROR, "tdb_transaction_recover:"