From: Rusty Russell Date: Tue, 23 Nov 2010 01:38:21 +0000 (+1030) Subject: tdb2: transaction support X-Git-Url: http://git.ozlabs.org/?a=commitdiff_plain;ds=inline;h=5e8b9af5e7fe5f1ccac407873a3b782b8a629782;p=ccan tdb2: transaction support This adds transactions to tdb2; the code is taken from tdb1 with minimal modifications, as are the unit --- diff --git a/ccan/tdb2/check.c b/ccan/tdb2/check.c index f73d01c3..3f5c5bd7 100644 --- a/ccan/tdb2/check.c +++ b/ccan/tdb2/check.c @@ -30,7 +30,7 @@ static bool append(tdb_off_t **arr, size_t *num, tdb_off_t off) return true; } -static bool check_header(struct tdb_context *tdb) +static bool check_header(struct tdb_context *tdb, tdb_off_t *recovery) { uint64_t hash_test; struct tdb_header hdr; @@ -57,6 +57,16 @@ static bool check_header(struct tdb_context *tdb) return false; } + *recovery = hdr.recovery; + if (*recovery) { + if (*recovery < sizeof(hdr) || *recovery > tdb->map_size) { + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_check: invalid recovery offset %zu\n", + (size_t)*recovery); + return false; + } + } + /* Don't check reserved: they *can* be used later. */ return true; } @@ -370,23 +380,73 @@ static bool check_free_list(struct tdb_context *tdb, return true; } +/* Slow, but should be very rare. */ +static size_t dead_space(struct tdb_context *tdb, tdb_off_t off) +{ + size_t len; + + for (len = 0; off + len < tdb->map_size; len++) { + char c; + if (tdb->methods->read(tdb, off, &c, 1)) + return 0; + if (c != 0 && c != 0x43) + break; + } + return len; +} + static bool check_linear(struct tdb_context *tdb, tdb_off_t **used, size_t *num_used, - tdb_off_t **free, size_t *num_free) + tdb_off_t **free, size_t *num_free, + tdb_off_t recovery) { tdb_off_t off; tdb_len_t len; + bool found_recovery = false; for (off = sizeof(struct tdb_header); off < tdb->map_size; off += len) { union { struct tdb_used_record u; struct tdb_free_record f; + struct tdb_recovery_record r; } pad, *p; p = tdb_get(tdb, off, &pad, sizeof(pad)); if (!p) return false; - if (frec_magic(&p->f) == TDB_FREE_MAGIC - || frec_magic(&p->f) == TDB_COALESCING_MAGIC) { + + /* If we crash after ftruncate, we can get zeroes or fill. */ + if (p->r.magic == TDB_RECOVERY_INVALID_MAGIC + || p->r.magic == 0x4343434343434343ULL) { + if (recovery == off) { + found_recovery = true; + len = sizeof(p->r) + p->r.max_len; + } else { + len = dead_space(tdb, off); + if (len < sizeof(p->r)) { + tdb->log(tdb, TDB_DEBUG_ERROR, + tdb->log_priv, + "tdb_check: invalid dead space" + " at %zu\n", (size_t)off); + return false; + } + + tdb->log(tdb, TDB_DEBUG_WARNING, tdb->log_priv, + "Dead space at %zu-%zu (of %zu)\n", + (size_t)off, (size_t)(off + len), + (size_t)tdb->map_size); + } + } else if (p->r.magic == TDB_RECOVERY_MAGIC) { + if (recovery != off) { + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_check: unexpected recovery" + " record at offset %zu\n", + (size_t)off); + return false; + } + found_recovery = true; + len = sizeof(p->r) + p->r.max_len; + } else if (frec_magic(&p->f) == TDB_FREE_MAGIC + || frec_magic(&p->f) == TDB_COALESCING_MAGIC) { len = sizeof(p->u) + p->f.data_len; if (off + len > tdb->map_size) { tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, @@ -437,6 +497,15 @@ static bool check_linear(struct tdb_context *tdb, } } } + + /* We must have found recovery area if there was one. */ + if (recovery != 0 && !found_recovery) { + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_check: expected a recovery area at %zu\n", + (size_t)recovery); + return false; + } + return true; } @@ -445,7 +514,7 @@ int tdb_check(struct tdb_context *tdb, int (*check)(TDB_DATA key, TDB_DATA data, void *private_data), void *private_data) { - tdb_off_t *free = NULL, *used = NULL, flist; + tdb_off_t *free = NULL, *used = NULL, flist, recovery; size_t num_free = 0, num_used = 0, num_found = 0, num_flists = 0; if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, false) != 0) @@ -456,11 +525,11 @@ int tdb_check(struct tdb_context *tdb, return -1; } - if (!check_header(tdb)) + if (!check_header(tdb, &recovery)) goto fail; /* First we do a linear scan, checking all records. */ - if (!check_linear(tdb, &used, &num_used, &free, &num_free)) + if (!check_linear(tdb, &used, &num_used, &free, &num_free, recovery)) goto fail; for (flist = first_flist(tdb); flist; flist = next_flist(tdb, flist)) { diff --git a/ccan/tdb2/lock.c b/ccan/tdb2/lock.c index 60ed4637..a4cfd26c 100644 --- a/ccan/tdb2/lock.c +++ b/ccan/tdb2/lock.c @@ -190,7 +190,6 @@ static int tdb_brunlock(struct tdb_context *tdb, return ret; } -#if 0 /* upgrade a read lock to a write lock. This needs to be handled in a special way as some OSes (such as solaris) have too conservative @@ -217,8 +216,7 @@ int tdb_allrecord_upgrade(struct tdb_context *tdb) while (count--) { struct timeval tv; if (tdb_brlock(tdb, F_WRLCK, - TDB_HASH_LOCK_START - + (1ULL << tdb->header.v.hash_bits), 0, + TDB_HASH_LOCK_START, 0, TDB_LOCK_WAIT|TDB_LOCK_PROBE) == 0) { tdb->allrecord_lock.ltype = F_WRLCK; tdb->allrecord_lock.off = 0; @@ -236,7 +234,6 @@ int tdb_allrecord_upgrade(struct tdb_context *tdb) "tdb_allrecord_upgrade failed\n"); return -1; } -#endif static struct tdb_lock_type *find_nestlock(struct tdb_context *tdb, tdb_off_t offset) @@ -251,6 +248,27 @@ static struct tdb_lock_type *find_nestlock(struct tdb_context *tdb, return NULL; } +int tdb_lock_and_recover(struct tdb_context *tdb) +{ + int ret; + + if (tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK, + false) == -1) { + return -1; + } + + if (tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK) == -1) { + tdb_allrecord_unlock(tdb, F_WRLCK); + return -1; + } + ret = tdb_transaction_recover(tdb); + + tdb_unlock_open(tdb); + tdb_allrecord_unlock(tdb, F_WRLCK); + + return ret; +} + /* lock an offset in the database. */ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype, enum tdb_lock_flags flags) @@ -310,6 +328,21 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype, return -1; } + /* First time we grab a lock, perhaps someone died in commit? */ + if (!(flags & TDB_LOCK_NOCHECK) + && tdb->num_lockrecs == 0 + && unlikely(tdb_needs_recovery(tdb))) { + tdb_brunlock(tdb, ltype, offset, 1); + + if (tdb_lock_and_recover(tdb) == -1) { + return -1; + } + + if (tdb_brlock(tdb, ltype, offset, 1, flags)) { + return -1; + } + } + tdb->lockrecs[tdb->num_lockrecs].off = offset; tdb->lockrecs[tdb->num_lockrecs].count = 1; tdb->lockrecs[tdb->num_lockrecs].ltype = ltype; @@ -318,40 +351,6 @@ static int tdb_nest_lock(struct tdb_context *tdb, tdb_off_t offset, int ltype, return 0; } -static int tdb_lock_and_recover(struct tdb_context *tdb) -{ -#if 0 /* FIXME */ - - int ret; - - /* We need to match locking order in transaction commit. */ - if (tdb_brlock(tdb, F_WRLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT)) { - return -1; - } - - if (tdb_brlock(tdb, F_WRLCK, OPEN_LOCK, 1, TDB_LOCK_WAIT)) { - tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0); - return -1; - } - - ret = tdb_transaction_recover(tdb); - - tdb_brunlock(tdb, F_WRLCK, OPEN_LOCK, 1); - tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP, 0); - - return ret; -#else - abort(); - return -1; -#endif -} - -static bool tdb_needs_recovery(struct tdb_context *tdb) -{ - /* FIXME */ - return false; -} - static int tdb_nest_unlock(struct tdb_context *tdb, tdb_off_t off, int ltype) { int ret = -1; @@ -390,14 +389,12 @@ static int tdb_nest_unlock(struct tdb_context *tdb, tdb_off_t off, int ltype) return ret; } -#if 0 /* get the transaction lock */ -int tdb_transaction_lock(struct tdb_context *tdb, int ltype, - enum tdb_lock_flags lockflags) +int tdb_transaction_lock(struct tdb_context *tdb, int ltype) { - return tdb_nest_lock(tdb, TRANSACTION_LOCK, ltype, lockflags); + return tdb_nest_lock(tdb, TDB_TRANSACTION_LOCK, ltype, TDB_LOCK_WAIT); } /* @@ -405,9 +402,8 @@ int tdb_transaction_lock(struct tdb_context *tdb, int ltype, */ int tdb_transaction_unlock(struct tdb_context *tdb, int ltype) { - return tdb_nest_unlock(tdb, TRANSACTION_LOCK, ltype, false); + return tdb_nest_unlock(tdb, TDB_TRANSACTION_LOCK, ltype); } -#endif /* We only need to lock individual bytes, but Linux merges consecutive locks * so we lock in contiguous ranges. */ @@ -474,7 +470,7 @@ int tdb_allrecord_lock(struct tdb_context *tdb, int ltype, return -1; } - if (tdb_has_locks(tdb)) { + if (tdb_has_hash_locks(tdb)) { /* can't combine global and chain locks */ tdb->ecode = TDB_ERR_LOCK; tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, @@ -522,7 +518,7 @@ again: tdb->allrecord_lock.off = upgradable; /* Now check for needing recovery. */ - if (unlikely(tdb_needs_recovery(tdb))) { + if (!(flags & TDB_LOCK_NOCHECK) && unlikely(tdb_needs_recovery(tdb))) { tdb_allrecord_unlock(tdb, ltype); if (tdb_lock_and_recover(tdb) == -1) { return -1; @@ -533,9 +529,9 @@ again: return 0; } -int tdb_lock_open(struct tdb_context *tdb) +int tdb_lock_open(struct tdb_context *tdb, enum tdb_lock_flags flags) { - return tdb_nest_lock(tdb, TDB_OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT); + return tdb_nest_lock(tdb, TDB_OPEN_LOCK, F_WRLCK, flags); } void tdb_unlock_open(struct tdb_context *tdb) @@ -543,9 +539,16 @@ void tdb_unlock_open(struct tdb_context *tdb) tdb_nest_unlock(tdb, TDB_OPEN_LOCK, F_WRLCK); } +bool tdb_has_open_lock(struct tdb_context *tdb) +{ + return find_nestlock(tdb, TDB_OPEN_LOCK) != NULL; +} + int tdb_lock_expand(struct tdb_context *tdb, int ltype) { - return tdb_nest_lock(tdb, TDB_EXPANSION_LOCK, ltype, TDB_LOCK_WAIT); + /* Lock doesn't protect data, so don't check (we recurse if we do!) */ + return tdb_nest_lock(tdb, TDB_EXPANSION_LOCK, ltype, + TDB_LOCK_WAIT | TDB_LOCK_NOCHECK); } void tdb_unlock_expand(struct tdb_context *tdb, int ltype) @@ -598,9 +601,17 @@ bool tdb_has_expansion_lock(struct tdb_context *tdb) return find_nestlock(tdb, TDB_EXPANSION_LOCK) != NULL; } -bool tdb_has_locks(struct tdb_context *tdb) +bool tdb_has_hash_locks(struct tdb_context *tdb) { - return tdb->allrecord_lock.count || tdb->num_lockrecs; + unsigned int i; + + for (i=0; inum_lockrecs; i++) { + if (tdb->lockrecs[i].off >= TDB_HASH_LOCK_START + && tdb->lockrecs[i].off < (TDB_HASH_LOCK_START + + TDB_HASH_LOCK_RANGE)) + return true; + } + return false; } #if 0 diff --git a/ccan/tdb2/private.h b/ccan/tdb2/private.h index 333eb20e..46a0e051 100644 --- a/ccan/tdb2/private.h +++ b/ccan/tdb2/private.h @@ -67,8 +67,8 @@ typedef uint64_t tdb_off_t; #define TDB_FREE_MAGIC ((uint64_t)0xFE) #define TDB_COALESCING_MAGIC ((uint64_t)0xFD) #define TDB_HASH_MAGIC (0xA1ABE11A01092008ULL) -#define TDB_RECOVERY_MAGIC (0xf53bc0e7U) -#define TDB_RECOVERY_INVALID_MAGIC (0x0) +#define TDB_RECOVERY_MAGIC (0xf53bc0e7ad124589ULL) +#define TDB_RECOVERY_INVALID_MAGIC (0x0ULL) #define TDB_OFF_ERR ((tdb_off_t)-1) @@ -189,6 +189,16 @@ static inline uint64_t frec_flist(const struct tdb_free_record *f) return f->magic_and_meta & ((1ULL << (64 - TDB_OFF_UPPER_STEAL)) - 1); } +struct tdb_recovery_record { + uint64_t magic; + /* Length of record. */ + uint64_t max_len; + /* Length used. */ + uint64_t len; + /* Old length of file before transaction. */ + uint64_t eof; +}; + /* this is stored at the front of every database */ struct tdb_header { char magic_food[64]; /* for /etc/magic */ @@ -197,8 +207,9 @@ struct tdb_header { uint64_t hash_test; /* result of hashing HASH_MAGIC. */ uint64_t hash_seed; /* "random" seed written at creation time. */ tdb_off_t free_list; /* (First) free list. */ + tdb_off_t recovery; /* Transaction recovery area. */ - tdb_off_t reserved[27]; + tdb_off_t reserved[26]; /* Top level hash table. */ tdb_off_t hashtable[1ULL << TDB_TOPLEVEL_HASH_BITS]; @@ -248,6 +259,8 @@ enum tdb_lock_flags { TDB_LOCK_WAIT = 1, /* If set, don't log an error on failure. */ TDB_LOCK_PROBE = 2, + /* If set, don't check for recovery (used by recovery code). */ + TDB_LOCK_NOCHECK = 4, }; struct tdb_lock_type { @@ -456,23 +469,31 @@ int tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off, enum tdb_lock_flags waitflag); void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off); -/* Do we have any locks? */ -bool tdb_has_locks(struct tdb_context *tdb); +/* Serialize transaction start. */ +int tdb_transaction_lock(struct tdb_context *tdb, int ltype); +int tdb_transaction_unlock(struct tdb_context *tdb, int ltype); + +/* Do we have any hash locks (ie. via tdb_chainlock) ? */ +bool tdb_has_hash_locks(struct tdb_context *tdb); /* Lock entire database. */ int tdb_allrecord_lock(struct tdb_context *tdb, int ltype, enum tdb_lock_flags flags, bool upgradable); int tdb_allrecord_unlock(struct tdb_context *tdb, int ltype); +int tdb_allrecord_upgrade(struct tdb_context *tdb); /* Serialize db open. */ -int tdb_lock_open(struct tdb_context *tdb); +int tdb_lock_open(struct tdb_context *tdb, enum tdb_lock_flags flags); void tdb_unlock_open(struct tdb_context *tdb); +bool tdb_has_open_lock(struct tdb_context *tdb); /* Serialize db expand. */ int tdb_lock_expand(struct tdb_context *tdb, int ltype); void tdb_unlock_expand(struct tdb_context *tdb, int ltype); bool tdb_has_expansion_lock(struct tdb_context *tdb); +/* If it needs recovery, grab all the locks and do it. */ +int tdb_lock_and_recover(struct tdb_context *tdb); /* traverse.c: */ int first_in_hash(struct tdb_context *tdb, int ltype, @@ -482,6 +503,9 @@ int next_in_hash(struct tdb_context *tdb, int ltype, struct traverse_info *tinfo, TDB_DATA *kbuf, size_t *dlen); +/* transaction.c: */ +int tdb_transaction_recover(struct tdb_context *tdb); +bool tdb_needs_recovery(struct tdb_context *tdb); #if 0 /* Low-level locking primitives. */ diff --git a/ccan/tdb2/tdb.c b/ccan/tdb2/tdb.c index cffb4130..d070a0ef 100644 --- a/ccan/tdb2/tdb.c +++ b/ccan/tdb2/tdb.c @@ -103,6 +103,7 @@ static int tdb_new_database(struct tdb_context *tdb, sizeof(newdb.hdr.hash_test), newdb.hdr.hash_seed, tdb->hash_priv); + newdb.hdr.recovery = 0; memset(newdb.hdr.reserved, 0, sizeof(newdb.hdr.reserved)); /* Initial hashes are empty. */ memset(newdb.hdr.hashtable, 0, sizeof(newdb.hdr.hashtable)); @@ -246,7 +247,7 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags, fcntl(tdb->fd, F_SETFD, v | FD_CLOEXEC); /* ensure there is only one process initialising at once */ - if (tdb_lock_open(tdb) == -1) { + if (tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK) == -1) { tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, "tdb_open: failed to get open lock on %s: %s\n", name, strerror(errno)); @@ -314,6 +315,12 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags, /* This make sure we have current map_size and mmap. */ tdb->methods->oob(tdb, tdb->map_size + 1, true); + /* Now it's fully formed, recover if necessary. */ + if (tdb_needs_recovery(tdb) && tdb_lock_and_recover(tdb) == -1) { + errno = EIO; + goto fail; + } + if (tdb_flist_init(tdb) == -1) goto fail; diff --git a/ccan/tdb2/tdb2.h b/ccan/tdb2/tdb2.h index ee20a28d..5050b3fd 100644 --- a/ccan/tdb2/tdb2.h +++ b/ccan/tdb2/tdb2.h @@ -142,6 +142,11 @@ int tdb_check(struct tdb_context *tdb, enum TDB_ERROR tdb_error(struct tdb_context *tdb); const char *tdb_errorstr(struct tdb_context *tdb); +int tdb_transaction_start(struct tdb_context *tdb); +void tdb_transaction_cancel(struct tdb_context *tdb); +int tdb_transaction_prepare_commit(struct tdb_context *tdb); +int tdb_transaction_commit(struct tdb_context *tdb); + char *tdb_summary(struct tdb_context *tdb, enum tdb_summary_flags flags); extern struct tdb_data tdb_null; diff --git a/ccan/tdb2/test/external-agent.c b/ccan/tdb2/test/external-agent.c index bcf5d215..fa5ca8b4 100644 --- a/ccan/tdb2/test/external-agent.c +++ b/ccan/tdb2/test/external-agent.c @@ -1,5 +1,6 @@ #include "external-agent.h" #include "logging.h" +#include "lock-tracking.h" #include #include #include @@ -16,11 +17,6 @@ static struct tdb_context *tdb; -#if 1 /* FIXME */ -static unsigned int locking_would_block = 0; -static bool nonblocking_locks = false; -#endif - static enum agent_return do_operation(enum operation op, const char *name) { TDB_DATA k; @@ -32,6 +28,8 @@ static enum agent_return do_operation(enum operation op, const char *name) return OTHER_FAILURE; } + diag("external: %s", operation_name(op)); + k.dptr = (void *)name; k.dsize = strlen(name); @@ -46,6 +44,7 @@ static enum agent_return do_operation(enum operation op, const char *name) if (!tdb) { if (!locking_would_block) diag("Opening tdb gave %s", strerror(errno)); + forget_locking(); ret = OTHER_FAILURE; } else ret = SUCCESS; @@ -68,7 +67,6 @@ static enum agent_return do_operation(enum operation op, const char *name) case STORE: ret = tdb_store(tdb, k, k, 0) == 0 ? SUCCESS : OTHER_FAILURE; break; -#if 0 /* FIXME */ case TRANSACTION_START: ret = tdb_transaction_start(tdb) == 0 ? SUCCESS : OTHER_FAILURE; break; @@ -78,7 +76,6 @@ static enum agent_return do_operation(enum operation op, const char *name) case NEEDS_RECOVERY: ret = tdb_needs_recovery(tdb) ? SUCCESS : FAILED; break; -#endif case CHECK: ret = tdb_check(tdb, NULL, NULL) == 0 ? SUCCESS : OTHER_FAILURE; break; @@ -183,11 +180,9 @@ const char *operation_name(enum operation op) case FETCH: return "FETCH"; case STORE: return "STORE"; case CHECK: return "CHECK"; -#if 0 case TRANSACTION_START: return "TRANSACTION_START"; case TRANSACTION_COMMIT: return "TRANSACTION_COMMIT"; case NEEDS_RECOVERY: return "NEEDS_RECOVERY"; -#endif case CLOSE: return "CLOSE"; } return "**INVALID**"; diff --git a/ccan/tdb2/test/external-agent.h b/ccan/tdb2/test/external-agent.h index 6d5c5306..ad537d5d 100644 --- a/ccan/tdb2/test/external-agent.h +++ b/ccan/tdb2/test/external-agent.h @@ -7,11 +7,9 @@ enum operation { OPEN, FETCH, STORE, -#if 0 TRANSACTION_START, TRANSACTION_COMMIT, NEEDS_RECOVERY, -#endif CHECK, CLOSE, }; diff --git a/ccan/tdb2/test/lock-tracking.c b/ccan/tdb2/test/lock-tracking.c new file mode 100644 index 00000000..05dba32f --- /dev/null +++ b/ccan/tdb2/test/lock-tracking.c @@ -0,0 +1,147 @@ +/* We save the locks so we can reaquire them. */ +#include +#include +#include +#include +#include +#include +#include "lock-tracking.h" + +struct lock { + struct lock *next; + unsigned int off; + unsigned int len; + int type; +}; +static struct lock *locks; +int locking_errors = 0; +bool suppress_lockcheck = false; +bool nonblocking_locks; +int locking_would_block = 0; +void (*unlock_callback)(int fd); + +int fcntl_with_lockcheck(int fd, int cmd, ... /* arg */ ) +{ + va_list ap; + int ret, arg3; + struct flock *fl; + bool may_block = false; + + if (cmd != F_SETLK && cmd != F_SETLKW) { + /* This may be totally bogus, but we don't know in general. */ + va_start(ap, cmd); + arg3 = va_arg(ap, int); + va_end(ap); + + return fcntl(fd, cmd, arg3); + } + + va_start(ap, cmd); + fl = va_arg(ap, struct flock *); + va_end(ap); + + if (cmd == F_SETLKW && nonblocking_locks) { + cmd = F_SETLK; + may_block = true; + } + ret = fcntl(fd, cmd, fl); + + /* Detect when we failed, but might have been OK if we waited. */ + if (may_block && ret == -1 && (errno == EAGAIN || errno == EACCES)) { + locking_would_block++; + } + + if (fl->l_type == F_UNLCK) { + struct lock **l; + struct lock *old = NULL; + + for (l = &locks; *l; l = &(*l)->next) { + if ((*l)->off == fl->l_start + && (*l)->len == fl->l_len) { + if (ret == 0) { + old = *l; + *l = (*l)->next; + free(old); + } + break; + } + } + if (!old && !suppress_lockcheck) { + diag("Unknown unlock %u@%u - %i", + (int)fl->l_len, (int)fl->l_start, ret); + locking_errors++; + } + } else { + struct lock *new, *i; + unsigned int fl_end = fl->l_start + fl->l_len; + if (fl->l_len == 0) + fl_end = (unsigned int)-1; + + /* Check for overlaps: we shouldn't do this. */ + for (i = locks; i; i = i->next) { + unsigned int i_end = i->off + i->len; + if (i->len == 0) + i_end = (unsigned int)-1; + + if (fl->l_start >= i->off && fl->l_start < i_end) + break; + if (fl_end > i->off && fl_end < i_end) + break; + + /* tdb_allrecord_lock does this, handle adjacent: */ + if (fl->l_start > TDB_HASH_LOCK_START + && fl->l_start == i_end && fl->l_type == i->type) { + if (ret == 0) { + i->len = fl->l_len + ? i->len + fl->l_len + : 0; + } + goto done; + } + } + if (i) { + /* Special case: upgrade of allrecord lock. */ + if (i->type == F_RDLCK && fl->l_type == F_WRLCK + && i->off == TDB_HASH_LOCK_START + && fl->l_start == TDB_HASH_LOCK_START + && i->len == 0 + && fl->l_len == 0) { + if (ret == 0) + i->type = F_WRLCK; + goto done; + } + if (!suppress_lockcheck) { + diag("%s lock %u@%u overlaps %u@%u", + fl->l_type == F_WRLCK ? "write" : "read", + (int)fl->l_len, (int)fl->l_start, + i->len, (int)i->off); + locking_errors++; + } + } + + if (ret == 0) { + new = malloc(sizeof *new); + new->off = fl->l_start; + new->len = fl->l_len; + new->type = fl->l_type; + new->next = locks; + locks = new; + } + } +done: + if (ret == 0 && fl->l_type == F_UNLCK && unlock_callback) + unlock_callback(fd); + return ret; +} + +unsigned int forget_locking(void) +{ + unsigned int num = 0; + while (locks) { + struct lock *next = locks->next; + free(locks); + locks = next; + num++; + } + return num; +} diff --git a/ccan/tdb2/test/lock-tracking.h b/ccan/tdb2/test/lock-tracking.h new file mode 100644 index 00000000..f2c9c446 --- /dev/null +++ b/ccan/tdb2/test/lock-tracking.h @@ -0,0 +1,25 @@ +#ifndef LOCK_TRACKING_H +#define LOCK_TRACKING_H +#include + +/* Set this if you want a callback after fnctl unlock. */ +extern void (*unlock_callback)(int fd); + +/* Replacement fcntl. */ +int fcntl_with_lockcheck(int fd, int cmd, ... /* arg */ ); + +/* Discard locking info: returns number of locks outstanding. */ +unsigned int forget_locking(void); + +/* Number of errors in locking. */ +extern int locking_errors; + +/* Suppress lock checking. */ +extern bool suppress_lockcheck; + +/* Make all locks non-blocking. */ +extern bool nonblocking_locks; + +/* Number of times we failed a lock because we made it non-blocking. */ +extern int locking_would_block; +#endif /* LOCK_TRACKING_H */ diff --git a/ccan/tdb2/test/run-001-encode.c b/ccan/tdb2/test/run-001-encode.c index 7a4fc06e..b209ed82 100644 --- a/ccan/tdb2/test/run-001-encode.c +++ b/ccan/tdb2/test/run-001-encode.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-001-fls.c b/ccan/tdb2/test/run-001-fls.c index d5f24925..a54d90d6 100644 --- a/ccan/tdb2/test/run-001-fls.c +++ b/ccan/tdb2/test/run-001-fls.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include diff --git a/ccan/tdb2/test/run-01-new_database.c b/ccan/tdb2/test/run-01-new_database.c index 6b82d576..ea385b60 100644 --- a/ccan/tdb2/test/run-01-new_database.c +++ b/ccan/tdb2/test/run-01-new_database.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-02-expand.c b/ccan/tdb2/test/run-02-expand.c index aa5d5679..06e06ac9 100644 --- a/ccan/tdb2/test/run-02-expand.c +++ b/ccan/tdb2/test/run-02-expand.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-03-coalesce.c b/ccan/tdb2/test/run-03-coalesce.c index c4014209..5d55577c 100644 --- a/ccan/tdb2/test/run-03-coalesce.c +++ b/ccan/tdb2/test/run-03-coalesce.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "logging.h" #include "layout.h" @@ -88,7 +89,7 @@ int main(int argc, char *argv[]) /* Lock and coalesce. */ ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0); ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024) == 1); - ok1(!tdb_has_locks(tdb)); + ok1(tdb->allrecord_lock.count == 0 && tdb->num_lockrecs == 0); ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024 + sizeof(struct tdb_used_record) + 2048); ok1(tdb_check(tdb, NULL, NULL) == 0); @@ -110,7 +111,7 @@ int main(int argc, char *argv[]) /* Lock and coalesce. */ ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0); ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024) == 1); - ok1(!tdb_has_locks(tdb)); + ok1(tdb->allrecord_lock.count == 0 && tdb->num_lockrecs == 0); ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024 + sizeof(struct tdb_used_record) + 512); ok1(tdb_check(tdb, NULL, NULL) == 0); @@ -133,7 +134,7 @@ int main(int argc, char *argv[]) /* Lock and coalesce. */ ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0); ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024) == 1); - ok1(!tdb_has_locks(tdb)); + ok1(tdb->allrecord_lock.count == 0 && tdb->num_lockrecs == 0); ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024 + sizeof(struct tdb_used_record) + 512 + sizeof(struct tdb_used_record) + 256); diff --git a/ccan/tdb2/test/run-04-basichash.c b/ccan/tdb2/test/run-04-basichash.c index d5353398..491c853f 100644 --- a/ccan/tdb2/test/run-04-basichash.c +++ b/ccan/tdb2/test/run-04-basichash.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-10-simple-store.c b/ccan/tdb2/test/run-10-simple-store.c index 35398fb0..097532a2 100644 --- a/ccan/tdb2/test/run-10-simple-store.c +++ b/ccan/tdb2/test/run-10-simple-store.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-11-simple-fetch.c b/ccan/tdb2/test/run-11-simple-fetch.c index 23395d2c..7225d88d 100644 --- a/ccan/tdb2/test/run-11-simple-fetch.c +++ b/ccan/tdb2/test/run-11-simple-fetch.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-12-store.c b/ccan/tdb2/test/run-12-store.c index fd53a92b..cf4f4a43 100644 --- a/ccan/tdb2/test/run-12-store.c +++ b/ccan/tdb2/test/run-12-store.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-13-delete.c b/ccan/tdb2/test/run-13-delete.c index 2f40d55a..ae01e38b 100644 --- a/ccan/tdb2/test/run-13-delete.c +++ b/ccan/tdb2/test/run-13-delete.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "logging.h" @@ -159,7 +160,7 @@ int main(int argc, char *argv[]) /* Check mixed bitpattern. */ test_val(tdb, 0x123456789ABCDEF0ULL); - ok1(!tdb_has_locks(tdb)); + ok1(tdb->allrecord_lock.count == 0 && tdb->num_lockrecs == 0); tdb_close(tdb); /* Deleting these entries in the db gave problems. */ diff --git a/ccan/tdb2/test/run-15-append.c b/ccan/tdb2/test/run-15-append.c index 51d4ddef..fe7a24e0 100644 --- a/ccan/tdb2/test/run-15-append.c +++ b/ccan/tdb2/test/run-15-append.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include "logging.h" @@ -67,7 +68,7 @@ int main(int argc, char *argv[]) moves++; oldoff = newoff; } - ok1(!tdb_has_locks(tdb)); + ok1(tdb->allrecord_lock.count == 0 && tdb->num_lockrecs == 0); /* We should increase by 50% each time... */ ok(moves <= ilog64(j / SIZE_STEP)*2, "Moved %u times", moves); tdb_close(tdb); @@ -98,7 +99,7 @@ int main(int argc, char *argv[]) moves++; oldoff = newoff; } - ok1(!tdb_has_locks(tdb)); + ok1(tdb->allrecord_lock.count == 0 && tdb->num_lockrecs == 0); /* We should increase by 50% each time... */ ok(moves <= ilog64(j / SIZE_STEP)*2, "Moved %u times", moves); tdb_close(tdb); @@ -120,7 +121,7 @@ int main(int argc, char *argv[]) ok1(data.dsize == MAX_SIZE); ok1(memcmp(data.dptr, buffer, data.dsize) == 0); free(data.dptr); - ok1(!tdb_has_locks(tdb)); + ok1(tdb->allrecord_lock.count == 0 && tdb->num_lockrecs == 0); tdb_close(tdb); } diff --git a/ccan/tdb2/test/run-20-growhash.c b/ccan/tdb2/test/run-20-growhash.c index adbe733e..160f37de 100644 --- a/ccan/tdb2/test/run-20-growhash.c +++ b/ccan/tdb2/test/run-20-growhash.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-30-exhaust-before-expand.c b/ccan/tdb2/test/run-30-exhaust-before-expand.c index e2e27295..5a5931a8 100644 --- a/ccan/tdb2/test/run-30-exhaust-before-expand.c +++ b/ccan/tdb2/test/run-30-exhaust-before-expand.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-50-multiple-freelists.c b/ccan/tdb2/test/run-50-multiple-freelists.c index 28bf2252..fd709730 100644 --- a/ccan/tdb2/test/run-50-multiple-freelists.c +++ b/ccan/tdb2/test/run-50-multiple-freelists.c @@ -5,6 +5,7 @@ #include #include #include +#include #include "logging.h" #include "layout.h" diff --git a/ccan/tdb2/test/run-55-transaction.c b/ccan/tdb2/test/run-55-transaction.c new file mode 100644 index 00000000..4c0dcabc --- /dev/null +++ b/ccan/tdb2/test/run-55-transaction.c @@ -0,0 +1,73 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "logging.h" + +int main(int argc, char *argv[]) +{ + unsigned int i; + struct tdb_context *tdb; + unsigned char *buffer; + int flags[] = { TDB_DEFAULT, TDB_NOMMAP, + TDB_CONVERT, TDB_NOMMAP|TDB_CONVERT }; + struct tdb_data key = { (unsigned char *)"key", 3 }; + struct tdb_data data; + + buffer = malloc(1000); + for (i = 0; i < 1000; i++) + buffer[i] = i; + + plan_tests(sizeof(flags) / sizeof(flags[0]) * 18 + 1); + + for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) { + tdb = tdb_open("run-55-transaction.tdb", flags[i], + O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr); + ok1(tdb); + if (!tdb) + continue; + + ok1(tdb_transaction_start(tdb) == 0); + data.dptr = buffer; + data.dsize = 1000; + ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0); + data = tdb_fetch(tdb, key); + ok1(data.dsize == 1000); + ok1(memcmp(data.dptr, buffer, data.dsize) == 0); + free(data.dptr); + + /* Cancelling a transaction means no store */ + tdb_transaction_cancel(tdb); + ok1(tdb->allrecord_lock.count == 0 && tdb->num_lockrecs == 0); + ok1(tdb_check(tdb, NULL, NULL) == 0); + data = tdb_fetch(tdb, key); + ok1(data.dsize == 0); + ok1(data.dptr == NULL); + + /* Commit the transaction. */ + ok1(tdb_transaction_start(tdb) == 0); + data.dptr = buffer; + data.dsize = 1000; + ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0); + data = tdb_fetch(tdb, key); + ok1(data.dsize == 1000); + ok1(memcmp(data.dptr, buffer, data.dsize) == 0); + free(data.dptr); + ok1(tdb_transaction_commit(tdb) == 0); + ok1(tdb->allrecord_lock.count == 0 && tdb->num_lockrecs == 0); + ok1(tdb_check(tdb, NULL, NULL) == 0); + data = tdb_fetch(tdb, key); + ok1(data.dsize == 1000); + ok1(memcmp(data.dptr, buffer, data.dsize) == 0); + free(data.dptr); + + tdb_close(tdb); + } + + ok1(tap_log_messages == 0); + return exit_status(); +} diff --git a/ccan/tdb2/test/run-56-open-during-transaction.c b/ccan/tdb2/test/run-56-open-during-transaction.c new file mode 100644 index 00000000..80a0b43c --- /dev/null +++ b/ccan/tdb2/test/run-56-open-during-transaction.c @@ -0,0 +1,176 @@ +#define _XOPEN_SOURCE 500 +#include +#include "lock-tracking.h" + +static ssize_t pwrite_check(int fd, const void *buf, size_t count, off_t offset); +static ssize_t write_check(int fd, const void *buf, size_t count); +static int ftruncate_check(int fd, off_t length); + +#define pwrite pwrite_check +#define write write_check +#define fcntl fcntl_with_lockcheck +#define ftruncate ftruncate_check + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "external-agent.h" +#include "logging.h" + +static struct agent *agent; +static bool opened; +static int errors = 0; +#define TEST_DBNAME "run-56-open-during-transaction.tdb" + +#undef write +#undef pwrite +#undef fcntl +#undef ftruncate + +static bool is_same(const char *snapshot, const char *latest, off_t len) +{ + unsigned i; + + for (i = 0; i < len; i++) { + if (snapshot[i] != latest[i]) + return false; + } + return true; +} + +static bool compare_file(int fd, const char *snapshot, off_t snapshot_len) +{ + char *contents; + bool same; + + /* over-length read serves as length check. */ + contents = malloc(snapshot_len+1); + same = pread(fd, contents, snapshot_len+1, 0) == snapshot_len + && is_same(snapshot, contents, snapshot_len); + free(contents); + return same; +} + +static void check_file_intact(int fd) +{ + enum agent_return ret; + struct stat st; + char *contents; + + fstat(fd, &st); + contents = malloc(st.st_size); + if (pread(fd, contents, st.st_size, 0) != st.st_size) { + diag("Read fail"); + errors++; + return; + } + + /* Ask agent to open file. */ + ret = external_agent_operation(agent, OPEN, TEST_DBNAME); + + /* It's OK to open it, but it must not have changed! */ + if (!compare_file(fd, contents, st.st_size)) { + diag("Agent changed file after opening %s", + agent_return_name(ret)); + errors++; + } + + if (ret == SUCCESS) { + ret = external_agent_operation(agent, CLOSE, NULL); + if (ret != SUCCESS) { + diag("Agent failed to close tdb: %s", + agent_return_name(ret)); + errors++; + } + } else if (ret != WOULD_HAVE_BLOCKED) { + diag("Agent opening file gave %s", + agent_return_name(ret)); + errors++; + } + + free(contents); +} + +static void after_unlock(int fd) +{ + if (opened) + check_file_intact(fd); +} + +static ssize_t pwrite_check(int fd, + const void *buf, size_t count, off_t offset) +{ + if (opened) + check_file_intact(fd); + + return pwrite(fd, buf, count, offset); +} + +static ssize_t write_check(int fd, const void *buf, size_t count) +{ + if (opened) + check_file_intact(fd); + + return write(fd, buf, count); +} + +static int ftruncate_check(int fd, off_t length) +{ + if (opened) + check_file_intact(fd); + + return ftruncate(fd, length); + +} + +int main(int argc, char *argv[]) +{ + const int flags[] = { TDB_DEFAULT, + TDB_NOMMAP, + TDB_CONVERT, + TDB_CONVERT | TDB_NOMMAP }; + int i; + struct tdb_context *tdb; + TDB_DATA key, data; + + plan_tests(20); + agent = prepare_external_agent(); + if (!agent) + err(1, "preparing agent"); + + unlock_callback = after_unlock; + for (i = 0; i < sizeof(flags)/sizeof(flags[0]); i++) { + diag("Test with %s and %s\n", + (flags[i] & TDB_CONVERT) ? "CONVERT" : "DEFAULT", + (flags[i] & TDB_NOMMAP) ? "no mmap" : "mmap"); + unlink(TEST_DBNAME); + tdb = tdb_open(TEST_DBNAME, flags[i], + O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr); + ok1(tdb); + + opened = true; + ok1(tdb_transaction_start(tdb) == 0); + key.dsize = strlen("hi"); + key.dptr = (void *)"hi"; + data.dptr = (void *)"world"; + data.dsize = strlen("world"); + + ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0); + ok1(tdb_transaction_commit(tdb) == 0); + ok(!errors, "We had %u open errors", errors); + + opened = false; + tdb_close(tdb); + } + + return exit_status(); +} diff --git a/ccan/tdb2/test/run-57-die-during-transaction.c b/ccan/tdb2/test/run-57-die-during-transaction.c new file mode 100644 index 00000000..4711e50e --- /dev/null +++ b/ccan/tdb2/test/run-57-die-during-transaction.c @@ -0,0 +1,221 @@ +#define _XOPEN_SOURCE 500 +#include +#include "lock-tracking.h" +static ssize_t pwrite_check(int fd, const void *buf, size_t count, off_t offset); +static ssize_t write_check(int fd, const void *buf, size_t count); +static int ftruncate_check(int fd, off_t length); + +#define pwrite pwrite_check +#define write write_check +#define fcntl fcntl_with_lockcheck +#define ftruncate ftruncate_check + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "external-agent.h" +#include "logging.h" + +#undef write +#undef pwrite +#undef fcntl +#undef ftruncate + +static bool in_transaction; +static int target, current; +static jmp_buf jmpbuf; +#define TEST_DBNAME "run-57-die-during-transaction.tdb" +#define KEY_STRING "helloworld" + +static void maybe_die(int fd) +{ + if (in_transaction && current++ == target) { + longjmp(jmpbuf, 1); + } +} + +static ssize_t pwrite_check(int fd, + const void *buf, size_t count, off_t offset) +{ + ssize_t ret; + + maybe_die(fd); + + ret = pwrite(fd, buf, count, offset); + if (ret != count) + return ret; + + maybe_die(fd); + return ret; +} + +static ssize_t write_check(int fd, const void *buf, size_t count) +{ + ssize_t ret; + + maybe_die(fd); + + ret = write(fd, buf, count); + if (ret != count) + return ret; + + maybe_die(fd); + return ret; +} + +static int ftruncate_check(int fd, off_t length) +{ + int ret; + + maybe_die(fd); + + ret = ftruncate(fd, length); + + maybe_die(fd); + return ret; +} + +static bool test_death(enum operation op, struct agent *agent) +{ + struct tdb_context *tdb = NULL; + TDB_DATA key; + enum agent_return ret; + int needed_recovery = 0; + + current = target = 0; +reset: + unlink(TEST_DBNAME); + tdb = tdb_open(TEST_DBNAME, TDB_NOMMAP, + O_CREAT|O_TRUNC|O_RDWR, 0600, &tap_log_attr); + + if (setjmp(jmpbuf) != 0) { + /* We're partway through. Simulate our death. */ + close(tdb->fd); + forget_locking(); + in_transaction = false; + + ret = external_agent_operation(agent, NEEDS_RECOVERY, ""); + if (ret == SUCCESS) + needed_recovery++; + else if (ret != FAILED) { + diag("Step %u agent NEEDS_RECOVERY = %s", current, + agent_return_name(ret)); + return false; + } + + ret = external_agent_operation(agent, op, KEY_STRING); + if (ret != SUCCESS) { + diag("Step %u op %s failed = %s", current, + operation_name(op), + agent_return_name(ret)); + return false; + } + + ret = external_agent_operation(agent, NEEDS_RECOVERY, ""); + if (ret != FAILED) { + diag("Still needs recovery after step %u = %s", + current, agent_return_name(ret)); + return false; + } + + ret = external_agent_operation(agent, CHECK, ""); + if (ret != SUCCESS) { + diag("Step %u check failed = %s", current, + agent_return_name(ret)); + return false; + } + + ret = external_agent_operation(agent, CLOSE, ""); + if (ret != SUCCESS) { + diag("Step %u close failed = %s", current, + agent_return_name(ret)); + return false; + } + + /* Suppress logging as this tries to use closed fd. */ + suppress_logging = true; + suppress_lockcheck = true; + tdb_close(tdb); + suppress_logging = false; + suppress_lockcheck = false; + target++; + current = 0; + goto reset; + } + + /* Put key for agent to fetch. */ + key.dsize = strlen(KEY_STRING); + key.dptr = (void *)KEY_STRING; + if (tdb_store(tdb, key, key, TDB_INSERT) != 0) + return false; + + /* This is the key we insert in transaction. */ + key.dsize--; + + ret = external_agent_operation(agent, OPEN, TEST_DBNAME); + if (ret != SUCCESS) + errx(1, "Agent failed to open: %s", agent_return_name(ret)); + + ret = external_agent_operation(agent, FETCH, KEY_STRING); + if (ret != SUCCESS) + errx(1, "Agent failed find key: %s", agent_return_name(ret)); + + in_transaction = true; + if (tdb_transaction_start(tdb) != 0) + return false; + + if (tdb_store(tdb, key, key, TDB_INSERT) != 0) + return false; + + if (tdb_transaction_commit(tdb) != 0) + return false; + + in_transaction = false; + + /* We made it! */ + diag("Completed %u runs", current); + tdb_close(tdb); + ret = external_agent_operation(agent, CLOSE, ""); + if (ret != SUCCESS) { + diag("Step %u close failed = %s", current, + agent_return_name(ret)); + return false; + } + + ok1(needed_recovery); + ok1(locking_errors == 0); + ok1(forget_locking() == 0); + locking_errors = 0; + return true; +} + +int main(int argc, char *argv[]) +{ + enum operation ops[] = { FETCH, STORE, TRANSACTION_START }; + struct agent *agent; + int i; + + plan_tests(12); + unlock_callback = maybe_die; + + agent = prepare_external_agent(); + if (!agent) + err(1, "preparing agent"); + + for (i = 0; i < sizeof(ops)/sizeof(ops[0]); i++) { + diag("Testing %s after death", operation_name(ops[i])); + ok1(test_death(ops[i], agent)); + } + + return exit_status(); +} diff --git a/ccan/tdb2/test/run-firstkey-nextkey.c b/ccan/tdb2/test/run-firstkey-nextkey.c index db52c852..424b736c 100644 --- a/ccan/tdb2/test/run-firstkey-nextkey.c +++ b/ccan/tdb2/test/run-firstkey-nextkey.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-missing-entries.c b/ccan/tdb2/test/run-missing-entries.c index bd5127dd..e197143f 100644 --- a/ccan/tdb2/test/run-missing-entries.c +++ b/ccan/tdb2/test/run-missing-entries.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-record-expand.c b/ccan/tdb2/test/run-record-expand.c index 809b8d77..27d58ff3 100644 --- a/ccan/tdb2/test/run-record-expand.c +++ b/ccan/tdb2/test/run-record-expand.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-remap-in-read_traverse.c b/ccan/tdb2/test/run-remap-in-read_traverse.c index 77fdc9ef..8dd1bf83 100644 --- a/ccan/tdb2/test/run-remap-in-read_traverse.c +++ b/ccan/tdb2/test/run-remap-in-read_traverse.c @@ -7,6 +7,7 @@ #include #include #include +#include #include #include "external-agent.h" #include "logging.h" diff --git a/ccan/tdb2/test/run-seed.c b/ccan/tdb2/test/run-seed.c index d78e863d..4f60cb5f 100644 --- a/ccan/tdb2/test/run-seed.c +++ b/ccan/tdb2/test/run-seed.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-simple-delete.c b/ccan/tdb2/test/run-simple-delete.c index d00bcbb9..deb7ebf2 100644 --- a/ccan/tdb2/test/run-simple-delete.c +++ b/ccan/tdb2/test/run-simple-delete.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-summary.c b/ccan/tdb2/test/run-summary.c index 450f0905..8ef5c80e 100644 --- a/ccan/tdb2/test/run-summary.c +++ b/ccan/tdb2/test/run-summary.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "logging.h" diff --git a/ccan/tdb2/test/run-tdb_errorstr.c b/ccan/tdb2/test/run-tdb_errorstr.c index dd8e7b0c..ad2df914 100644 --- a/ccan/tdb2/test/run-tdb_errorstr.c +++ b/ccan/tdb2/test/run-tdb_errorstr.c @@ -4,6 +4,7 @@ #include #include #include +#include #include int main(int argc, char *argv[]) diff --git a/ccan/tdb2/test/run-traverse.c b/ccan/tdb2/test/run-traverse.c index 97694a84..e3c2a895 100644 --- a/ccan/tdb2/test/run-traverse.c +++ b/ccan/tdb2/test/run-traverse.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "logging.h" diff --git a/ccan/tdb2/transaction.c b/ccan/tdb2/transaction.c new file mode 100644 index 00000000..53bcc21c --- /dev/null +++ b/ccan/tdb2/transaction.c @@ -0,0 +1,1191 @@ + /* + Unix SMB/CIFS implementation. + + trivial database library + + Copyright (C) Andrew Tridgell 2005 + Copyright (C) Rusty Russell 2010 + + ** NOTE! The following LGPL license applies to the tdb + ** library. This does NOT imply that all of Samba is released + ** under the LGPL + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 3 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see . +*/ + +#include "private.h" +#define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0) + +/* + transaction design: + + - only allow a single transaction at a time per database. This makes + using the transaction API simpler, as otherwise the caller would + have to cope with temporary failures in transactions that conflict + with other current transactions + + - keep the transaction recovery information in the same file as the + database, using a special 'transaction recovery' record pointed at + by the header. This removes the need for extra journal files as + used by some other databases + + - dynamically allocated the transaction recover record, re-using it + for subsequent transactions. If a larger record is needed then + tdb_free() the old record to place it on the normal tdb freelist + before allocating the new record + + - during transactions, keep a linked list of writes all that have + been performed by intercepting all tdb_write() calls. The hooked + transaction versions of tdb_read() and tdb_write() check this + linked list and try to use the elements of the list in preference + to the real database. + + - don't allow any locks to be held when a transaction starts, + otherwise we can end up with deadlock (plus lack of lock nesting + in posix locks would mean the lock is lost) + + - if the caller gains a lock during the transaction but doesn't + release it then fail the commit + + - allow for nested calls to tdb_transaction_start(), re-using the + existing transaction record. If the inner transaction is cancelled + then a subsequent commit will fail + + - keep a mirrored copy of the tdb hash chain heads to allow for the + fast hash heads scan on traverse, updating the mirrored copy in + the transaction version of tdb_write + + - allow callers to mix transaction and non-transaction use of tdb, + although once a transaction is started then an exclusive lock is + gained until the transaction is committed or cancelled + + - the commit stategy involves first saving away all modified data + into a linearised buffer in the transaction recovery area, then + marking the transaction recovery area with a magic value to + indicate a valid recovery record. In total 4 fsync/msync calls are + needed per commit to prevent race conditions. It might be possible + to reduce this to 3 or even 2 with some more work. + + - check for a valid recovery record on open of the tdb, while the + open lock is held. Automatically recover from the transaction + recovery area if needed, then continue with the open as + usual. This allows for smooth crash recovery with no administrator + intervention. + + - if TDB_NOSYNC is passed to flags in tdb_open then transactions are + still available, but no transaction recovery area is used and no + fsync/msync calls are made. +*/ + + +/* + hold the context of any current transaction +*/ +struct tdb_transaction { + /* the original io methods - used to do IOs to the real db */ + const struct tdb_methods *io_methods; + + /* the list of transaction blocks. When a block is first + written to, it gets created in this list */ + uint8_t **blocks; + size_t num_blocks; + size_t last_block_size; /* number of valid bytes in the last block */ + + /* non-zero when an internal transaction error has + occurred. All write operations will then fail until the + transaction is ended */ + int transaction_error; + + /* when inside a transaction we need to keep track of any + nested tdb_transaction_start() calls, as these are allowed, + but don't create a new transaction */ + int nesting; + + /* set when a prepare has already occurred */ + bool prepared; + tdb_off_t magic_offset; + + /* old file size before transaction */ + tdb_len_t old_map_size; +}; + + +/* + read while in a transaction. We need to check first if the data is in our list + of transaction elements, then if not do a real read +*/ +static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, + tdb_len_t len) +{ + size_t blk; + + /* break it down into block sized ops */ + while (len + (off % getpagesize()) > getpagesize()) { + tdb_len_t len2 = getpagesize() - (off % getpagesize()); + if (transaction_read(tdb, off, buf, len2) != 0) { + return -1; + } + len -= len2; + off += len2; + buf = (void *)(len2 + (char *)buf); + } + + if (len == 0) { + return 0; + } + + blk = off / getpagesize(); + + /* see if we have it in the block list */ + if (tdb->transaction->num_blocks <= blk || + tdb->transaction->blocks[blk] == NULL) { + /* nope, do a real read */ + if (tdb->transaction->io_methods->read(tdb, off, buf, len) != 0) { + goto fail; + } + return 0; + } + + /* it is in the block list. Now check for the last block */ + if (blk == tdb->transaction->num_blocks-1) { + if (len > tdb->transaction->last_block_size) { + goto fail; + } + } + + /* now copy it out of this block */ + memcpy(buf, tdb->transaction->blocks[blk] + (off % getpagesize()), len); + return 0; + +fail: + tdb->ecode = TDB_ERR_IO; + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "transaction_read: failed at off=%llu len=%llu\n", + (long long)off, (long long)len); + tdb->transaction->transaction_error = 1; + return -1; +} + + +/* + write while in a transaction +*/ +static int transaction_write(struct tdb_context *tdb, tdb_off_t off, + const void *buf, tdb_len_t len) +{ + size_t blk; + + /* Only a commit is allowed on a prepared transaction */ + if (tdb->transaction->prepared) { + tdb->ecode = TDB_ERR_EINVAL; + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "transaction_write: transaction already prepared," + " write not allowed\n"); + tdb->transaction->transaction_error = 1; + return -1; + } + + /* break it up into block sized chunks */ + while (len + (off % getpagesize()) > getpagesize()) { + tdb_len_t len2 = getpagesize() - (off % getpagesize()); + if (transaction_write(tdb, off, buf, len2) != 0) { + return -1; + } + len -= len2; + off += len2; + if (buf != NULL) { + buf = (const void *)(len2 + (const char *)buf); + } + } + + if (len == 0) { + return 0; + } + + blk = off / getpagesize(); + off = off % getpagesize(); + + if (tdb->transaction->num_blocks <= blk) { + uint8_t **new_blocks; + /* expand the blocks array */ + if (tdb->transaction->blocks == NULL) { + new_blocks = (uint8_t **)malloc( + (blk+1)*sizeof(uint8_t *)); + } else { + new_blocks = (uint8_t **)realloc( + tdb->transaction->blocks, + (blk+1)*sizeof(uint8_t *)); + } + if (new_blocks == NULL) { + tdb->ecode = TDB_ERR_OOM; + goto fail; + } + memset(&new_blocks[tdb->transaction->num_blocks], 0, + (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *)); + tdb->transaction->blocks = new_blocks; + tdb->transaction->num_blocks = blk+1; + tdb->transaction->last_block_size = 0; + } + + /* allocate and fill a block? */ + if (tdb->transaction->blocks[blk] == NULL) { + tdb->transaction->blocks[blk] = (uint8_t *)calloc(getpagesize(), 1); + if (tdb->transaction->blocks[blk] == NULL) { + tdb->ecode = TDB_ERR_OOM; + tdb->transaction->transaction_error = 1; + return -1; + } + if (tdb->transaction->old_map_size > blk * getpagesize()) { + tdb_len_t len2 = getpagesize(); + if (len2 + (blk * getpagesize()) > tdb->transaction->old_map_size) { + len2 = tdb->transaction->old_map_size - (blk * getpagesize()); + } + if (tdb->transaction->io_methods->read(tdb, blk * getpagesize(), + tdb->transaction->blocks[blk], + len2) != 0) { + SAFE_FREE(tdb->transaction->blocks[blk]); + goto fail; + } + if (blk == tdb->transaction->num_blocks-1) { + tdb->transaction->last_block_size = len2; + } + } + } + + /* overwrite part of an existing block */ + if (buf == NULL) { + memset(tdb->transaction->blocks[blk] + off, 0, len); + } else { + memcpy(tdb->transaction->blocks[blk] + off, buf, len); + } + if (blk == tdb->transaction->num_blocks-1) { + if (len + off > tdb->transaction->last_block_size) { + tdb->transaction->last_block_size = len + off; + } + } + + return 0; + +fail: + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "transaction_write: failed at off=%llu len=%llu\n", + (long long)((blk*getpagesize()) + off), + (long long)len); + tdb->transaction->transaction_error = 1; + return -1; +} + + +/* + write while in a transaction - this varient never expands the transaction blocks, it only + updates existing blocks. This means it cannot change the recovery size +*/ +static void transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, + const void *buf, tdb_len_t len) +{ + size_t blk; + + /* break it up into block sized chunks */ + while (len + (off % getpagesize()) > getpagesize()) { + tdb_len_t len2 = getpagesize() - (off % getpagesize()); + transaction_write_existing(tdb, off, buf, len2); + len -= len2; + off += len2; + if (buf != NULL) { + buf = (const void *)(len2 + (const char *)buf); + } + } + + if (len == 0) { + return; + } + + blk = off / getpagesize(); + off = off % getpagesize(); + + if (tdb->transaction->num_blocks <= blk || + tdb->transaction->blocks[blk] == NULL) { + return; + } + + if (blk == tdb->transaction->num_blocks-1 && + off + len > tdb->transaction->last_block_size) { + if (off >= tdb->transaction->last_block_size) { + return; + } + len = tdb->transaction->last_block_size - off; + } + + /* overwrite part of an existing block */ + memcpy(tdb->transaction->blocks[blk] + off, buf, len); +} + + +/* + out of bounds check during a transaction +*/ +static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, bool probe) +{ + if (len <= tdb->map_size) { + return 0; + } + tdb->ecode = TDB_ERR_IO; + return -1; +} + +/* + transaction version of tdb_expand(). +*/ +static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t addition) +{ + /* add a write to the transaction elements, so subsequent + reads see the zero data */ + if (transaction_write(tdb, tdb->map_size, NULL, addition) != 0) { + return -1; + } + tdb->map_size += addition; + return 0; +} + +static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off, + size_t len) +{ + /* FIXME */ + return NULL; +} + +static const struct tdb_methods transaction_methods = { + transaction_read, + transaction_write, + transaction_oob, + transaction_expand_file, + transaction_direct, +}; + +/* + sync to disk +*/ +static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length) +{ + if (tdb->flags & TDB_NOSYNC) { + return 0; + } + + if (fsync(tdb->fd) != 0) { + tdb->ecode = TDB_ERR_IO; + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction: fsync failed\n"); + return -1; + } +#ifdef MS_SYNC + if (tdb->map_ptr) { + tdb_off_t moffset = offset & ~(getpagesize()-1); + if (msync(moffset + (char *)tdb->map_ptr, + length + (offset - moffset), MS_SYNC) != 0) { + tdb->ecode = TDB_ERR_IO; + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction: msync failed - %s\n", + strerror(errno)); + return -1; + } + } +#endif + return 0; +} + + +static void _tdb_transaction_cancel(struct tdb_context *tdb) +{ + int i; + + if (tdb->transaction == NULL) { + tdb->ecode = TDB_ERR_EINVAL; + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_cancel: no transaction\n"); + return; + } + + if (tdb->transaction->nesting != 0) { + tdb->transaction->transaction_error = 1; + tdb->transaction->nesting--; + return; + } + + tdb->map_size = tdb->transaction->old_map_size; + + /* free all the transaction blocks */ + for (i=0;itransaction->num_blocks;i++) { + if (tdb->transaction->blocks[i] != NULL) { + free(tdb->transaction->blocks[i]); + } + } + SAFE_FREE(tdb->transaction->blocks); + + if (tdb->transaction->magic_offset) { + const struct tdb_methods *methods = tdb->transaction->io_methods; + uint64_t invalid = TDB_RECOVERY_INVALID_MAGIC; + + /* remove the recovery marker */ + if (methods->write(tdb, tdb->transaction->magic_offset, + &invalid, sizeof(invalid)) == -1 || + transaction_sync(tdb, tdb->transaction->magic_offset, + sizeof(invalid)) == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_cancel: failed to remove" + " recovery magic\n"); + } + } + + if (tdb->allrecord_lock.count) + tdb_allrecord_unlock(tdb, tdb->allrecord_lock.ltype); + + /* restore the normal io methods */ + tdb->methods = tdb->transaction->io_methods; + + tdb_transaction_unlock(tdb, F_WRLCK); + tdb_unlock_expand(tdb, F_WRLCK); + + if (tdb_has_open_lock(tdb)) + tdb_unlock_open(tdb); + + SAFE_FREE(tdb->transaction); +} + +/* + start a tdb transaction. No token is returned, as only a single + transaction is allowed to be pending per tdb_context +*/ +int tdb_transaction_start(struct tdb_context *tdb) +{ + /* some sanity checks */ + if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) { + tdb->ecode = TDB_ERR_EINVAL; + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_start: cannot start a transaction" + " on a read-only or internal db\n"); + return -1; + } + + /* cope with nested tdb_transaction_start() calls */ + if (tdb->transaction != NULL) { + tdb->ecode = TDB_ERR_NESTING; + return -1; + } + + if (tdb_has_hash_locks(tdb)) { + /* the caller must not have any locks when starting a + transaction as otherwise we'll be screwed by lack + of nested locks in posix */ + tdb->ecode = TDB_ERR_LOCK; + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_start: cannot start a transaction" + " with locks held\n"); + return -1; + } + + tdb->transaction = (struct tdb_transaction *) + calloc(sizeof(struct tdb_transaction), 1); + if (tdb->transaction == NULL) { + tdb->ecode = TDB_ERR_OOM; + return -1; + } + + /* get the transaction write lock. This is a blocking lock. As + discussed with Volker, there are a number of ways we could + make this async, which we will probably do in the future */ + if (tdb_transaction_lock(tdb, F_WRLCK) == -1) { + SAFE_FREE(tdb->transaction->blocks); + SAFE_FREE(tdb->transaction); + return -1; + } + + /* get a read lock over entire file. This is upgraded to a write + lock during the commit */ + if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) { + goto fail_allrecord_lock; + } + + if (tdb_lock_expand(tdb, F_WRLCK) != 0) { + goto fail_expand_lock; + } + + /* make sure we know about any file expansions already done by + anyone else */ + tdb->methods->oob(tdb, tdb->map_size + 1, true); + tdb->transaction->old_map_size = tdb->map_size; + + /* finally hook the io methods, replacing them with + transaction specific methods */ + tdb->transaction->io_methods = tdb->methods; + tdb->methods = &transaction_methods; + return 0; + +fail_expand_lock: + tdb_allrecord_unlock(tdb, F_RDLCK); +fail_allrecord_lock: + tdb_transaction_unlock(tdb, F_WRLCK); + SAFE_FREE(tdb->transaction->blocks); + SAFE_FREE(tdb->transaction); + return -1; +} + + +/* + cancel the current transaction +*/ +void tdb_transaction_cancel(struct tdb_context *tdb) +{ + _tdb_transaction_cancel(tdb); +} + +/* + work out how much space the linearised recovery data will consume +*/ +static tdb_len_t tdb_recovery_size(struct tdb_context *tdb) +{ + tdb_len_t recovery_size = 0; + int i; + + recovery_size = sizeof(tdb_len_t); + for (i=0;itransaction->num_blocks;i++) { + if (i * getpagesize() >= tdb->transaction->old_map_size) { + break; + } + if (tdb->transaction->blocks[i] == NULL) { + continue; + } + recovery_size += 2*sizeof(tdb_off_t); + if (i == tdb->transaction->num_blocks-1) { + recovery_size += tdb->transaction->last_block_size; + } else { + recovery_size += getpagesize(); + } + } + + return recovery_size; +} + +/* + allocate the recovery area, or use an existing recovery area if it is + large enough +*/ +static int tdb_recovery_allocate(struct tdb_context *tdb, + tdb_len_t *recovery_size, + tdb_off_t *recovery_offset, + tdb_len_t *recovery_max_size) +{ + struct tdb_recovery_record rec; + const struct tdb_methods *methods = tdb->transaction->io_methods; + tdb_off_t recovery_head; + size_t addition; + + recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery)); + if (recovery_head == TDB_OFF_ERR) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_recovery_allocate:" + " failed to read recovery head\n"); + return -1; + } + + if (recovery_head != 0) { + if (methods->read(tdb, recovery_head, &rec, sizeof(rec))) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_recovery_allocate:" + " failed to read recovery record\n"); + return -1; + } + tdb_convert(tdb, &rec, sizeof(rec)); + /* ignore invalid recovery regions: can happen in crash */ + if (rec.magic != TDB_RECOVERY_MAGIC && + rec.magic != TDB_RECOVERY_INVALID_MAGIC) { + recovery_head = 0; + } + } + + *recovery_size = tdb_recovery_size(tdb); + + if (recovery_head != 0 && *recovery_size <= rec.max_len) { + /* it fits in the existing area */ + *recovery_max_size = rec.max_len; + *recovery_offset = recovery_head; + return 0; + } + + /* we need to free up the old recovery area, then allocate a + new one at the end of the file. Note that we cannot use + normal allocation to allocate the new one as that might return + us an area that is being currently used (as of the start of + the transaction) */ + if (recovery_head != 0) { + if (add_free_record(tdb, recovery_head, + sizeof(rec) + rec.max_len) != 0) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_recovery_allocate:" + " failed to free previous recovery area\n"); + return -1; + } + } + + /* the tdb_free() call might have increased the recovery size */ + *recovery_size = tdb_recovery_size(tdb); + + /* round up to a multiple of page size */ + *recovery_max_size + = (((sizeof(rec) + *recovery_size) + getpagesize()-1) + & ~(getpagesize()-1)) + - sizeof(rec); + *recovery_offset = tdb->map_size; + recovery_head = *recovery_offset; + + /* Restore ->map_size before calling underlying expand_file. + Also so that we don't try to expand the file again in the + transaction commit, which would destroy the recovery + area */ + addition = (tdb->map_size - tdb->transaction->old_map_size) + + sizeof(rec) + *recovery_max_size; + tdb->map_size = tdb->transaction->old_map_size; + if (methods->expand_file(tdb, addition) == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_recovery_allocate:" + " failed to create recovery area\n"); + return -1; + } + + /* we have to reset the old map size so that we don't try to + expand the file again in the transaction commit, which + would destroy the recovery area */ + tdb->transaction->old_map_size = tdb->map_size; + + /* write the recovery header offset and sync - we can sync without a race here + as the magic ptr in the recovery record has not been set */ + tdb_convert(tdb, &recovery_head, sizeof(recovery_head)); + if (methods->write(tdb, offsetof(struct tdb_header, recovery), + &recovery_head, sizeof(tdb_off_t)) == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_recovery_allocate:" + " failed to write recovery head\n"); + return -1; + } + transaction_write_existing(tdb, offsetof(struct tdb_header, recovery), + &recovery_head, + sizeof(tdb_off_t)); + return 0; +} + +/* Set up header for the recovery record. */ +static void set_recovery_header(struct tdb_recovery_record *rec, + uint64_t magic, + uint64_t datalen, uint64_t actuallen, + uint64_t oldsize) +{ + rec->magic = magic; + rec->max_len = actuallen; + rec->len = datalen; + rec->eof = oldsize; +} + +/* + setup the recovery data that will be used on a crash during commit +*/ +static int transaction_setup_recovery(struct tdb_context *tdb, + tdb_off_t *magic_offset) +{ + tdb_len_t recovery_size; + unsigned char *data, *p; + const struct tdb_methods *methods = tdb->transaction->io_methods; + struct tdb_recovery_record *rec; + tdb_off_t recovery_offset, recovery_max_size; + tdb_off_t old_map_size = tdb->transaction->old_map_size; + uint64_t magic, tailer; + int i; + + /* + check that the recovery area has enough space + */ + if (tdb_recovery_allocate(tdb, &recovery_size, + &recovery_offset, &recovery_max_size) == -1) { + return -1; + } + + data = (unsigned char *)malloc(recovery_size + sizeof(*rec)); + if (data == NULL) { + tdb->ecode = TDB_ERR_OOM; + return -1; + } + + rec = (struct tdb_recovery_record *)data; + set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC, + recovery_size, recovery_max_size, old_map_size); + tdb_convert(tdb, rec, sizeof(*rec)); + + /* build the recovery data into a single blob to allow us to do a single + large write, which should be more efficient */ + p = data + sizeof(*rec); + for (i=0;itransaction->num_blocks;i++) { + tdb_off_t offset; + tdb_len_t length; + + if (tdb->transaction->blocks[i] == NULL) { + continue; + } + + offset = i * getpagesize(); + length = getpagesize(); + if (i == tdb->transaction->num_blocks-1) { + length = tdb->transaction->last_block_size; + } + + if (offset >= old_map_size) { + continue; + } + if (offset + length > tdb->map_size) { + tdb->ecode = TDB_ERR_CORRUPT; + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_setup_recovery:" + " transaction data over new region boundary\n"); + free(data); + return -1; + } + memcpy(p, &offset, sizeof(offset)); + memcpy(p + sizeof(offset), &length, sizeof(length)); + tdb_convert(tdb, p, sizeof(offset) + sizeof(length)); + + /* the recovery area contains the old data, not the + new data, so we have to call the original tdb_read + method to get it */ + if (methods->read(tdb, offset, + p + sizeof(offset) + sizeof(length), + length) != 0) { + free(data); + return -1; + } + p += sizeof(offset) + sizeof(length) + length; + } + + /* and the tailer */ + tailer = sizeof(*rec) + recovery_max_size; + memcpy(p, &tailer, sizeof(tailer)); + tdb_convert(tdb, p, sizeof(tailer)); + + /* write the recovery data to the recovery area */ + if (methods->write(tdb, recovery_offset, data, + sizeof(*rec) + recovery_size) == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_setup_recovery:" + " failed to write recovery data\n"); + free(data); + return -1; + } + transaction_write_existing(tdb, recovery_offset, data, + sizeof(*rec) + recovery_size); + + /* as we don't have ordered writes, we have to sync the recovery + data before we update the magic to indicate that the recovery + data is present */ + if (transaction_sync(tdb, recovery_offset, + sizeof(*rec) + recovery_size) == -1) { + free(data); + return -1; + } + + free(data); + + magic = TDB_RECOVERY_MAGIC; + tdb_convert(tdb, &magic, sizeof(magic)); + + *magic_offset = recovery_offset + offsetof(struct tdb_recovery_record, + magic); + + if (methods->write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_setup_recovery:" + " failed to write recovery magic\n"); + return -1; + } + transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)); + + /* ensure the recovery magic marker is on disk */ + if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) { + return -1; + } + + return 0; +} + +static int _tdb_transaction_prepare_commit(struct tdb_context *tdb) +{ + const struct tdb_methods *methods; + + if (tdb->transaction == NULL) { + tdb->ecode = TDB_ERR_EINVAL; + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_prepare_commit: no transaction\n"); + return -1; + } + + if (tdb->transaction->prepared) { + tdb->ecode = TDB_ERR_EINVAL; + _tdb_transaction_cancel(tdb); + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_prepare_commit:" + " transaction already prepared\n"); + return -1; + } + + if (tdb->transaction->transaction_error) { + tdb->ecode = TDB_ERR_IO; + _tdb_transaction_cancel(tdb); + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_prepare_commit:" + " transaction error pending\n"); + return -1; + } + + + if (tdb->transaction->nesting != 0) { + tdb->transaction->nesting--; + return 0; + } + + /* check for a null transaction */ + if (tdb->transaction->blocks == NULL) { + return 0; + } + + methods = tdb->transaction->io_methods; + + /* upgrade the main transaction lock region to a write lock */ + if (tdb_allrecord_upgrade(tdb) == -1) { + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_prepare_commit:" + " failed to upgrade hash locks\n"); + _tdb_transaction_cancel(tdb); + return -1; + } + + /* get the open lock - this prevents new users attaching to the database + during the commit */ + if (tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK) == -1) { + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_prepare_commit:" + " failed to get open lock\n"); + _tdb_transaction_cancel(tdb); + return -1; + } + + if (!(tdb->flags & TDB_NOSYNC)) { + /* write the recovery data to the end of the file */ + if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) { + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_prepare_commit:" + " failed to setup recovery data\n"); + _tdb_transaction_cancel(tdb); + return -1; + } + } + + tdb->transaction->prepared = true; + + /* expand the file to the new size if needed */ + if (tdb->map_size != tdb->transaction->old_map_size) { + tdb_len_t add = tdb->map_size - tdb->transaction->old_map_size; + /* Restore original map size for tdb_expand_file */ + tdb->map_size = tdb->transaction->old_map_size; + if (methods->expand_file(tdb, add) == -1) { + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_prepare_commit:" + " expansion failed\n"); + _tdb_transaction_cancel(tdb); + return -1; + } + } + + /* Keep the open lock until the actual commit */ + + return 0; +} + +/* + prepare to commit the current transaction +*/ +int tdb_transaction_prepare_commit(struct tdb_context *tdb) +{ + return _tdb_transaction_prepare_commit(tdb); +} + +/* + commit the current transaction +*/ +int tdb_transaction_commit(struct tdb_context *tdb) +{ + const struct tdb_methods *methods; + int i; + + if (tdb->transaction == NULL) { + tdb->ecode = TDB_ERR_EINVAL; + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_commit: no transaction\n"); + return -1; + } + + tdb_trace(tdb, "tdb_transaction_commit"); + + if (tdb->transaction->transaction_error) { + tdb->ecode = TDB_ERR_IO; + tdb_transaction_cancel(tdb); + tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv, + "tdb_transaction_commit: transaction error pending\n"); + return -1; + } + + + if (tdb->transaction->nesting != 0) { + tdb->transaction->nesting--; + return 0; + } + + /* check for a null transaction */ + if (tdb->transaction->blocks == NULL) { + _tdb_transaction_cancel(tdb); + return 0; + } + + if (!tdb->transaction->prepared) { + int ret = _tdb_transaction_prepare_commit(tdb); + if (ret) + return ret; + } + + methods = tdb->transaction->io_methods; + + /* perform all the writes */ + for (i=0;itransaction->num_blocks;i++) { + tdb_off_t offset; + tdb_len_t length; + + if (tdb->transaction->blocks[i] == NULL) { + continue; + } + + offset = i * getpagesize(); + length = getpagesize(); + if (i == tdb->transaction->num_blocks-1) { + length = tdb->transaction->last_block_size; + } + + if (methods->write(tdb, offset, tdb->transaction->blocks[i], + length) == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_commit:" + " write failed during commit\n"); + + /* we've overwritten part of the data and + possibly expanded the file, so we need to + run the crash recovery code */ + tdb->methods = methods; + tdb_transaction_recover(tdb); + + _tdb_transaction_cancel(tdb); + + return -1; + } + SAFE_FREE(tdb->transaction->blocks[i]); + } + + SAFE_FREE(tdb->transaction->blocks); + tdb->transaction->num_blocks = 0; + + /* ensure the new data is on disk */ + if (transaction_sync(tdb, 0, tdb->map_size) == -1) { + return -1; + } + + /* + TODO: maybe write to some dummy hdr field, or write to magic + offset without mmap, before the last sync, instead of the + utime() call + */ + + /* on some systems (like Linux 2.6.x) changes via mmap/msync + don't change the mtime of the file, this means the file may + not be backed up (as tdb rounding to block sizes means that + file size changes are quite rare too). The following forces + mtime changes when a transaction completes */ +#if HAVE_UTIME + utime(tdb->name, NULL); +#endif + + /* use a transaction cancel to free memory and remove the + transaction locks */ + _tdb_transaction_cancel(tdb); + + return 0; +} + + +/* + recover from an aborted transaction. Must be called with exclusive + database write access already established (including the open + lock to prevent new processes attaching) +*/ +int tdb_transaction_recover(struct tdb_context *tdb) +{ + tdb_off_t recovery_head, recovery_eof; + unsigned char *data, *p; + struct tdb_recovery_record rec; + + /* find the recovery area */ + recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery)); + if (recovery_head == TDB_OFF_ERR) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_recover:" + " failed to read recovery head\n"); + return -1; + } + + if (recovery_head == 0) { + /* we have never allocated a recovery record */ + return 0; + } + + /* read the recovery record */ + if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_recover:" + " failed to read recovery record\n"); + return -1; + } + + if (rec.magic != TDB_RECOVERY_MAGIC) { + /* there is no valid recovery data */ + return 0; + } + + if (tdb->read_only) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_recover:" + " attempt to recover read only database\n"); + tdb->ecode = TDB_ERR_CORRUPT; + return -1; + } + + recovery_eof = rec.eof; + + data = (unsigned char *)malloc(rec.len); + if (data == NULL) { + tdb->ecode = TDB_ERR_OOM; + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_recover:" + " failed to allocate recovery data\n"); + return -1; + } + + /* read the full recovery data */ + if (tdb->methods->read(tdb, recovery_head + sizeof(rec), data, + rec.len) == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_recover:" + " failed to read recovery data\n"); + return -1; + } + + /* recover the file data */ + p = data; + while (p+sizeof(tdb_off_t)+sizeof(tdb_len_t) < data + rec.len) { + tdb_off_t ofs; + tdb_len_t len; + tdb_convert(tdb, p, sizeof(ofs) + sizeof(len)); + memcpy(&ofs, p, sizeof(ofs)); + memcpy(&len, p + sizeof(ofs), sizeof(len)); + p += sizeof(ofs) + sizeof(len); + + if (tdb->methods->write(tdb, ofs, p, len) == -1) { + free(data); + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_recover:" + " failed to recover %zu bytes at offset %zu\n", + (size_t)len, (size_t)ofs); + return -1; + } + p += len; + } + + free(data); + + if (transaction_sync(tdb, 0, tdb->map_size) == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_recover: failed to sync recovery\n"); + return -1; + } + + /* if the recovery area is after the recovered eof then remove it */ + if (recovery_eof <= recovery_head) { + if (tdb_write_off(tdb, offsetof(struct tdb_header,recovery), 0) + == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_recover:" + " failed to remove recovery head\n"); + return -1; + } + } + + /* remove the recovery magic */ + if (tdb_write_off(tdb, + recovery_head + + offsetof(struct tdb_recovery_record, magic), + TDB_RECOVERY_INVALID_MAGIC) == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_recover:" + " failed to remove recovery magic\n"); + return -1; + } + + if (transaction_sync(tdb, 0, recovery_eof) == -1) { + tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv, + "tdb_transaction_recover: failed to sync2 recovery\n"); + return -1; + } + + tdb->log(tdb, TDB_DEBUG_TRACE, tdb->log_priv, + "tdb_transaction_recover: recovered %zu byte database\n", + (size_t)recovery_eof); + + /* all done */ + return 0; +} + +/* Any I/O failures we say "needs recovery". */ +bool tdb_needs_recovery(struct tdb_context *tdb) +{ + tdb_off_t recovery_head; + struct tdb_recovery_record rec; + + /* find the recovery area */ + recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery)); + if (recovery_head == TDB_OFF_ERR) { + return true; + } + + if (recovery_head == 0) { + /* we have never allocated a recovery record */ + return false; + } + + /* read the recovery record */ + if (tdb_read_convert(tdb, recovery_head, &rec, sizeof(rec)) == -1) { + return true; + } + + return (rec.magic == TDB_RECOVERY_MAGIC); +}