};
/* This doesn't really need to be pagesize, but we use it for similar reasons. */
-#define PAGESIZE 4096
+#define PAGESIZE 65536
/*
read while in a transaction. We need to check first if the data is in our list
static enum TDB_ERROR transaction_oob(struct tdb_context *tdb, tdb_off_t len,
bool probe)
{
- if (len <= tdb->file->map_size) {
+ if (len <= tdb->file->map_size || probe) {
return TDB_SUCCESS;
}
- if (!probe) {
- tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
- "tdb_oob len %lld beyond transaction size %lld",
- (long long)len,
- (long long)tdb->file->map_size);
- }
+
+ tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
+ "tdb_oob len %lld beyond transaction size %lld",
+ (long long)len,
+ (long long)tdb->file->map_size);
return TDB_ERR_IO;
}
/* Can only do direct if in single block and we've already copied. */
if (write_mode) {
- if (blk != end_blk)
- return NULL;
- if (blk >= tdb->transaction->num_blocks)
- return NULL;
- if (tdb->transaction->blocks[blk] == NULL)
+ tdb->stats.transaction_write_direct++;
+ if (blk != end_blk
+ || blk >= tdb->transaction->num_blocks
+ || tdb->transaction->blocks[blk] == NULL) {
+ tdb->stats.transaction_write_direct_fail++;
return NULL;
+ }
return tdb->transaction->blocks[blk] + off % PAGESIZE;
}
+ tdb->stats.transaction_read_direct++;
/* Single which we have copied? */
if (blk == end_blk
&& blk < tdb->transaction->num_blocks
while (blk <= end_blk) {
if (blk >= tdb->transaction->num_blocks)
break;
- if (tdb->transaction->blocks[blk])
+ if (tdb->transaction->blocks[blk]) {
+ tdb->stats.transaction_read_direct_fail++;
return NULL;
+ }
blk++;
}
return tdb->transaction->io_methods->direct(tdb, off, len, false);
}
#ifdef MS_SYNC
if (tdb->file->map_ptr) {
- tdb_off_t moffset = offset & ~(PAGESIZE-1);
+ tdb_off_t moffset = offset & ~(getpagesize()-1);
if (msync(moffset + (char *)tdb->file->map_ptr,
length + (offset - moffset), MS_SYNC) != 0) {
return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
tdb_transaction_unlock(tdb, F_WRLCK);
if (tdb_has_open_lock(tdb))
- tdb_unlock_open(tdb);
+ tdb_unlock_open(tdb, F_WRLCK);
SAFE_FREE(tdb->transaction);
}
{
enum TDB_ERROR ecode;
+ tdb->stats.transactions++;
/* some sanity checks */
- if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
+ if (tdb->flags & TDB_INTERNAL) {
return tdb->last_error = tdb_logerr(tdb, TDB_ERR_EINVAL,
+ TDB_LOG_USE_ERROR,
+ "tdb_transaction_start:"
+ " cannot start a"
+ " transaction on an"
+ " internal tdb");
+ }
+
+ if (tdb->flags & TDB_RDONLY) {
+ return tdb->last_error = tdb_logerr(tdb, TDB_ERR_RDONLY,
TDB_LOG_USE_ERROR,
"tdb_transaction_start:"
" cannot start a"
" transaction on a "
- "read-only or internal db");
+ " read-only tdb");
}
/* cope with nested tdb_transaction_start() calls */
" already inside transaction");
}
tdb->transaction->nesting++;
+ tdb->stats.transaction_nest++;
return 0;
}
*/
void tdb_transaction_cancel(struct tdb_context *tdb)
{
+ tdb->stats.transaction_cancel++;
_tdb_transaction_cancel(tdb);
}
size_t i;
enum TDB_ERROR ecode;
unsigned char *p;
- const struct tdb_methods *methods = tdb->transaction->io_methods;
+ const struct tdb_methods *old_methods = tdb->methods;
rec = malloc(sizeof(*rec) + tdb_recovery_size(tdb));
if (!rec) {
return TDB_ERR_PTR(TDB_ERR_OOM);
}
+ /* We temporarily revert to the old I/O methods, so we can use
+ * tdb_access_read */
+ tdb->methods = tdb->transaction->io_methods;
+
/* build the recovery data into a single blob to allow us to do a single
large write, which should be more efficient */
p = (unsigned char *)(rec + 1);
tdb_off_t offset;
tdb_len_t length;
unsigned int off;
- unsigned char buffer[PAGESIZE];
+ const unsigned char *buffer;
if (tdb->transaction->blocks[i] == NULL) {
continue;
}
if (offset + length > tdb->file->map_size) {
- free(rec);
- tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
- "tdb_transaction_setup_recovery:"
- " transaction data over new region"
- " boundary");
- return TDB_ERR_PTR(TDB_ERR_CORRUPT);
+ ecode = tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
+ "tdb_transaction_setup_recovery:"
+ " transaction data over new region"
+ " boundary");
+ goto fail;
}
if (offset + length > tdb->transaction->old_map_size) {
/* Short read at EOF. */
length = tdb->transaction->old_map_size - offset;
}
- ecode = methods->tread(tdb, offset, buffer, length);
- if (ecode != TDB_SUCCESS) {
- free(rec);
- return TDB_ERR_PTR(ecode);
+ buffer = tdb_access_read(tdb, offset, length, false);
+ if (TDB_PTR_IS_ERR(buffer)) {
+ ecode = TDB_PTR_ERR(buffer);
+ goto fail;
}
/* Skip over anything the same at the start. */
off += len + samelen;
offset += len + samelen;
}
+ tdb_access_release(tdb, buffer);
}
*len = p - (unsigned char *)(rec + 1);
+ tdb->methods = old_methods;
return rec;
+
+fail:
+ free(rec);
+ tdb->methods = old_methods;
+ return TDB_ERR_PTR(ecode);
}
static tdb_off_t create_recovery_area(struct tdb_context *tdb,
addition = (tdb->file->map_size - tdb->transaction->old_map_size) +
sizeof(*rec) + rec->max_len;
tdb->file->map_size = tdb->transaction->old_map_size;
+ tdb->stats.transaction_expand_file++;
ecode = methods->expand_file(tdb, addition);
if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
/* get the open lock - this prevents new users attaching to the database
during the commit */
- ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
+ ecode = tdb_lock_open(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
if (ecode != TDB_SUCCESS) {
return ecode;
}
return TDB_SUCCESS;
}
- if (tdb->read_only) {
+ if (tdb->flags & TDB_RDONLY) {
return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
"tdb_transaction_recover:"
" attempt to recover read only database");