]> git.ozlabs.org Git - ccan/commitdiff
tdb2: TDB_ATTRIBUTE_FLOCK support
authorRusty Russell <rusty@rustcorp.com.au>
Thu, 7 Apr 2011 07:18:43 +0000 (16:48 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Thu, 7 Apr 2011 07:18:43 +0000 (16:48 +0930)
This allows overriding of low-level locking functions.  This allows
special effects such as non-blocking operations, and lock proxying.

We rename some local lock vars to l to avoid -Wshadow warnings.

ccan/tdb2/lock.c
ccan/tdb2/open.c
ccan/tdb2/private.h
ccan/tdb2/tdb2.h
ccan/tdb2/test/run-82-lockattr.c [new file with mode: 0644]

index 8bd9c4f0f4bca5c9a5079a74120919a1ae73c3d5..2878eaeee5e98a9a034005a9dd0e626fa733ffc0 100644 (file)
@@ -37,7 +37,7 @@ static enum TDB_ERROR owner_conflict(struct tdb_context *tdb, const char *call)
                          call);
 }
 
-/* If we fork, we no longer really own locks. */
+/* If we fork, we no longer really own locks: preserves errno */
 static bool check_lock_pid(struct tdb_context *tdb,
                           const char *call, bool log)
 {
@@ -60,34 +60,59 @@ static bool check_lock_pid(struct tdb_context *tdb,
        return false;
 }
 
-static int fcntl_lock(struct tdb_context *tdb,
-                     int rw, off_t off, off_t len, bool waitflag)
+int tdb_fcntl_lock(int fd, int rw, off_t off, off_t len, bool waitflag,
+                  void *unused)
+{
+       struct flock fl;
+       int ret;
+
+       do {
+               fl.l_type = rw;
+               fl.l_whence = SEEK_SET;
+               fl.l_start = off;
+               fl.l_len = len;
+
+               if (waitflag)
+                       ret = fcntl(fd, F_SETLKW, &fl);
+               else
+                       ret = fcntl(fd, F_SETLK, &fl);
+       } while (ret != 0 && errno == EINTR);
+       return ret;
+}
+
+int tdb_fcntl_unlock(int fd, int rw, off_t off, off_t len, void *unused)
 {
        struct flock fl;
+       int ret;
+
+       do {
+               fl.l_type = F_UNLCK;
+               fl.l_whence = SEEK_SET;
+               fl.l_start = off;
+               fl.l_len = len;
 
-       fl.l_type = rw;
-       fl.l_whence = SEEK_SET;
-       fl.l_start = off;
-       fl.l_len = len;
-       fl.l_pid = 0;
+               ret = fcntl(fd, F_SETLKW, &fl);
+       } while (ret != 0 && errno == EINTR);
+       return ret;
+}
 
+static int lock(struct tdb_context *tdb,
+                     int rw, off_t off, off_t len, bool waitflag)
+{
        if (tdb->file->allrecord_lock.count == 0
            && tdb->file->num_lockrecs == 0) {
                tdb->file->locker = getpid();
        }
 
        add_stat(tdb, lock_lowlevel, 1);
-       if (waitflag)
-               return fcntl(tdb->file->fd, F_SETLKW, &fl);
-       else {
+       if (!waitflag)
                add_stat(tdb, lock_nonblock, 1);
-               return fcntl(tdb->file->fd, F_SETLK, &fl);
-       }
+       return tdb->lock_fn(tdb->file->fd, rw, off, len, waitflag,
+                           tdb->lock_data);
 }
 
-static int fcntl_unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
+static int unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
 {
-       struct flock fl;
 #if 0 /* Check they matched up locks and unlocks correctly. */
        char line[80];
        FILE *locks;
@@ -146,13 +171,7 @@ static int fcntl_unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
        fclose(locks);
 #endif
 
-       fl.l_type = F_UNLCK;
-       fl.l_whence = SEEK_SET;
-       fl.l_start = off;
-       fl.l_len = len;
-       fl.l_pid = 0;
-
-       return fcntl(tdb->file->fd, F_SETLKW, &fl);
+       return tdb->unlock_fn(tdb->file->fd, rw, off, len, tdb->lock_data);
 }
 
 /* a byte range locking function - return 0 on success
@@ -183,16 +202,13 @@ static enum TDB_ERROR tdb_brlock(struct tdb_context *tdb,
                                  (long long)(offset + len));
        }
 
-       do {
-               ret = fcntl_lock(tdb, rw_type, offset, len,
-                                flags & TDB_LOCK_WAIT);
-       } while (ret == -1 && errno == EINTR);
-
-       if (ret == -1) {
+       ret = lock(tdb, rw_type, offset, len, flags & TDB_LOCK_WAIT);
+       if (ret != 0) {
                /* Generic lock error. errno set by fcntl.
                 * EAGAIN is an expected return from non-blocking
                 * locks. */
-               if (!(flags & TDB_LOCK_PROBE) && errno != EAGAIN) {
+               if (!(flags & TDB_LOCK_PROBE)
+                   && (errno != EAGAIN && errno != EINTR)) {
                        tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
                                   "tdb_brlock failed (fd=%d) at"
                                   " offset %zu rw_type=%d flags=%d len=%zu:"
@@ -214,17 +230,15 @@ static enum TDB_ERROR tdb_brunlock(struct tdb_context *tdb,
                return TDB_SUCCESS;
        }
 
-       do {
-               ret = fcntl_unlock(tdb, rw_type, offset, len);
-       } while (ret == -1 && errno == EINTR);
+       ret = unlock(tdb, rw_type, offset, len);
 
        /* If we fail, *then* we verify that we owned the lock.  If not, ok. */
        if (ret == -1 && check_lock_pid(tdb, "tdb_brunlock", false)) {
                return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
                                  "tdb_brunlock failed (fd=%d) at offset %zu"
-                                 " rw_type=%d len=%zu",
+                                 " rw_type=%d len=%zu: %s",
                                  tdb->file->fd, (size_t)offset, rw_type,
-                                 (size_t)len);
+                                 (size_t)len, strerror(errno));
        }
        return TDB_SUCCESS;
 }
@@ -276,8 +290,11 @@ enum TDB_ERROR tdb_allrecord_upgrade(struct tdb_context *tdb)
                tv.tv_usec = 1;
                select(0, NULL, NULL, NULL, &tv);
        }
-       return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
-                         "tdb_allrecord_upgrade failed");
+
+       if (errno != EAGAIN && errno != EINTR)
+               tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
+                          "tdb_allrecord_upgrade failed");
+       return TDB_ERR_LOCK;
 }
 
 static struct tdb_lock *find_nestlock(struct tdb_context *tdb, tdb_off_t offset,
@@ -699,7 +716,7 @@ enum TDB_ERROR tdb_lock_hashes(struct tdb_context *tdb,
                               int ltype, enum tdb_lock_flags waitflag)
 {
        /* FIXME: Do this properly, using hlock_range */
-       unsigned lock = TDB_HASH_LOCK_START
+       unsigned l = TDB_HASH_LOCK_START
                + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
 
        /* a allrecord lock allows us to avoid per chain locks */
@@ -732,14 +749,14 @@ enum TDB_ERROR tdb_lock_hashes(struct tdb_context *tdb,
                                  " already have expansion lock");
        }
 
-       return tdb_nest_lock(tdb, lock, ltype, waitflag);
+       return tdb_nest_lock(tdb, l, ltype, waitflag);
 }
 
 enum TDB_ERROR tdb_unlock_hashes(struct tdb_context *tdb,
                                 tdb_off_t hash_lock,
                                 tdb_len_t hash_range, int ltype)
 {
-       unsigned lock = TDB_HASH_LOCK_START
+       unsigned l = TDB_HASH_LOCK_START
                + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
 
        if (tdb->flags & TDB_NOLOCK)
@@ -755,7 +772,7 @@ enum TDB_ERROR tdb_unlock_hashes(struct tdb_context *tdb,
                return TDB_SUCCESS;
        }
 
-       return tdb_nest_unlock(tdb, lock, ltype);
+       return tdb_nest_unlock(tdb, l, ltype);
 }
 
 /* Hash locks use TDB_HASH_LOCK_START + the next 30 bits.
index f358d0322d1d494ab05d826568b6f32f90b981b2..7aae8b466af9c8b051548ec1d3ac691ad64a0d8b 100644 (file)
@@ -212,6 +212,8 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
        tdb->access = NULL;
        tdb->last_error = TDB_SUCCESS;
        tdb->file = NULL;
+       tdb->lock_fn = fcntl_lock;
+       tdb->unlock_fn = fcntl_unlock;
        tdb_hash_init(tdb);
        tdb_io_init(tdb);
 
@@ -237,6 +239,11 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
                case TDB_ATTRIBUTE_OPENHOOK:
                        openhook = &attr->openhook;
                        break;
+               case TDB_ATTRIBUTE_FLOCK:
+                       tdb->lock_fn = attr->flock.lock;
+                       tdb->unlock_fn = attr->flock.unlock;
+                       tdb->lock_data = attr->flock.data;
+                       break;
                default:
                        ecode = tdb_logerr(tdb, TDB_ERR_EINVAL,
                                           TDB_LOG_USE_ERROR,
@@ -340,7 +347,8 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
        /* ensure there is only one process initialising at once */
        ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
        if (ecode != TDB_SUCCESS) {
-               goto fail;
+               saved_errno = errno;
+               goto fail_errno;
        }
 
        /* call their open hook if they gave us one. */
index a569b1db3667b05e27e6aaf27fdf0afd02182898..9cd8188c70c559c3f0e1c2a6f2845cab4571cc3c 100644 (file)
@@ -376,6 +376,11 @@ struct tdb_context {
        void *hash_data;
        uint64_t hash_seed;
 
+       /* low level (fnctl) lock functions. */
+       int (*lock_fn)(int fd, int rw, off_t off, off_t len, bool w, void *);
+       int (*unlock_fn)(int fd, int rw, off_t off, off_t len, void *);
+       void *lock_data;
+
        /* Set if we are in a transaction. */
        struct tdb_transaction *transaction;
        
@@ -584,6 +589,10 @@ bool tdb_has_expansion_lock(struct tdb_context *tdb);
 /* If it needs recovery, grab all the locks and do it. */
 enum TDB_ERROR tdb_lock_and_recover(struct tdb_context *tdb);
 
+/* Default lock and unlock functions. */
+int tdb_fcntl_lock(int fd, int rw, off_t off, off_t len, bool waitflag, void *);
+int tdb_fcntl_unlock(int fd, int rw, off_t off, off_t len, void *);
+
 /* transaction.c: */
 enum TDB_ERROR tdb_transaction_recover(struct tdb_context *tdb);
 tdb_bool_err tdb_needs_recovery(struct tdb_context *tdb);
index d78ef974b340cece0ac3306c7052edd21928deb5..b88568a881568a89381aba0e557216eb0d2ecc6e 100644 (file)
@@ -610,7 +610,8 @@ enum tdb_attribute_type {
        TDB_ATTRIBUTE_HASH = 1,
        TDB_ATTRIBUTE_SEED = 2,
        TDB_ATTRIBUTE_STATS = 3,
-       TDB_ATTRIBUTE_OPENHOOK = 4
+       TDB_ATTRIBUTE_OPENHOOK = 4,
+       TDB_ATTRIBUTE_FLOCK = 5
 };
 
 /**
@@ -736,6 +737,25 @@ struct tdb_attribute_openhook {
        void *data;
 };
 
+/**
+ * struct tdb_attribute_flock - tdb special effects hook for file locking
+ *
+ * This attribute contains function to call to place locks on a file; it can
+ * be used to support non-blocking operations or lock proxying.
+ *
+ * They should return 0 on success, -1 on failure and set errno.
+ *
+ * An error will be logged on error if errno is neither EAGAIN nor EINTR
+ * (normally it would only return EAGAIN if waitflag is false, and
+ * loop internally on EINTR).
+ */
+struct tdb_attribute_flock {
+       struct tdb_attribute_base base; /* .attr = TDB_ATTRIBUTE_FLOCK */
+       int (*lock)(int fd,int rw, off_t off, off_t len, bool waitflag, void *);
+       int (*unlock)(int fd, int rw, off_t off, off_t len, void *);
+       void *data;
+};
+
 /**
  * union tdb_attribute - tdb attributes.
  *
@@ -744,7 +764,7 @@ struct tdb_attribute_openhook {
  * See also:
  *     struct tdb_attribute_log, struct tdb_attribute_hash,
  *     struct tdb_attribute_seed, struct tdb_attribute_stats,
- *     struct tdb_attribute_openhook.
+ *     struct tdb_attribute_openhook, struct tdb_attribute_flock.
  */
 union tdb_attribute {
        struct tdb_attribute_base base;
@@ -753,6 +773,7 @@ union tdb_attribute {
        struct tdb_attribute_seed seed;
        struct tdb_attribute_stats stats;
        struct tdb_attribute_openhook openhook;
+       struct tdb_attribute_flock flock;
 };
 
 #ifdef  __cplusplus
diff --git a/ccan/tdb2/test/run-82-lockattr.c b/ccan/tdb2/test/run-82-lockattr.c
new file mode 100644 (file)
index 0000000..bfc2653
--- /dev/null
@@ -0,0 +1,263 @@
+#include <ccan/tdb2/tdb.c>
+#include <ccan/tdb2/open.c>
+#include <ccan/tdb2/free.c>
+#include <ccan/tdb2/lock.c>
+#include <ccan/tdb2/io.c>
+#include <ccan/tdb2/hash.c>
+#include <ccan/tdb2/check.c>
+#include <ccan/tdb2/transaction.c>
+#include <ccan/tdb2/traverse.c>
+#include <ccan/tap/tap.h>
+#include "logging.h"
+
+static int mylock(int fd, int rw, off_t off, off_t len, bool waitflag,
+                 void *_err)
+{
+       int *lock_err = _err;
+       struct flock fl;
+       int ret;
+
+       if (*lock_err) {
+               errno = *lock_err;
+               return -1;
+       }
+
+       do {
+               fl.l_type = rw;
+               fl.l_whence = SEEK_SET;
+               fl.l_start = off;
+               fl.l_len = len;
+
+               if (waitflag)
+                       ret = fcntl(fd, F_SETLKW, &fl);
+               else
+                       ret = fcntl(fd, F_SETLK, &fl);
+       } while (ret != 0 && errno == EINTR);
+
+       return ret;
+}
+
+static int myunlock(int fd, int rw, off_t off, off_t len, void *_err)
+{
+       int *lock_err = _err;
+       struct flock fl;
+       int ret;
+
+       if (*lock_err) {
+               errno = *lock_err;
+               return -1;
+       }
+
+       do {
+               fl.l_type = F_UNLCK;
+               fl.l_whence = SEEK_SET;
+               fl.l_start = off;
+               fl.l_len = len;
+
+               ret = fcntl(fd, F_SETLKW, &fl);
+       } while (ret != 0 && errno == EINTR);
+
+       return ret;
+}
+
+static int trav_err;
+static int trav(struct tdb_context *tdb, TDB_DATA k, TDB_DATA d, int *err)
+{
+       *err = trav_err;
+       return 0;
+}
+
+int main(int argc, char *argv[])
+{
+       unsigned int i;
+       struct tdb_context *tdb;
+       int flags[] = { TDB_DEFAULT, TDB_NOMMAP,
+                       TDB_CONVERT, TDB_NOMMAP|TDB_CONVERT };
+       union tdb_attribute lock_attr;
+       struct tdb_data key = tdb_mkdata("key", 3);
+       struct tdb_data data = tdb_mkdata("data", 4);
+       int lock_err;
+
+       lock_attr.base.attr = TDB_ATTRIBUTE_FLOCK;
+       lock_attr.base.next = &tap_log_attr;
+       lock_attr.flock.lock = mylock;
+       lock_attr.flock.unlock = myunlock;
+       lock_attr.flock.data = &lock_err;
+
+       plan_tests(sizeof(flags) / sizeof(flags[0]) * 80);
+
+       for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
+               struct tdb_data d;
+
+               /* Nonblocking open; expect no error message. */
+               lock_err = EAGAIN;
+               tdb = tdb_open("run-82-lockattr.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &lock_attr);
+               ok(errno == lock_err, "Errno is %u", errno);
+               ok1(!tdb);
+               ok1(tap_log_messages == 0);
+
+               lock_err = EINTR;
+               tdb = tdb_open("run-82-lockattr.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &lock_attr);
+               ok(errno == lock_err, "Errno is %u", errno);
+               ok1(!tdb);
+               ok1(tap_log_messages == 0);
+
+               /* Forced fail open. */
+               lock_err = ENOMEM;
+               tdb = tdb_open("run-82-lockattr.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &lock_attr);
+               ok1(errno == lock_err);
+               ok1(!tdb);
+               ok1(tap_log_messages == 1);
+               tap_log_messages = 0;
+
+               lock_err = 0;
+               tdb = tdb_open("run-82-lockattr.tdb", flags[i],
+                              O_RDWR|O_CREAT|O_TRUNC, 0600, &lock_attr);
+               if (!ok1(tdb))
+                       continue;
+               ok1(tap_log_messages == 0);
+
+               /* Nonblocking store. */
+               lock_err = EAGAIN;
+               ok1(tdb_store(tdb, key, data, TDB_REPLACE) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = EINTR;
+               ok1(tdb_store(tdb, key, data, TDB_REPLACE) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = ENOMEM;
+               ok1(tdb_store(tdb, key, data, TDB_REPLACE) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 1);
+               tap_log_messages = 0;
+
+               /* Nonblocking fetch. */
+               lock_err = EAGAIN;
+               ok1(!tdb_exists(tdb, key));
+               ok1(tap_log_messages == 0);
+               lock_err = EINTR;
+               ok1(!tdb_exists(tdb, key));
+               ok1(tap_log_messages == 0);
+               lock_err = ENOMEM;
+               ok1(!tdb_exists(tdb, key));
+               ok1(tap_log_messages == 1);
+               tap_log_messages = 0;
+
+               lock_err = EAGAIN;
+               ok1(tdb_fetch(tdb, key, &d) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = EINTR;
+               ok1(tdb_fetch(tdb, key, &d) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = ENOMEM;
+               ok1(tdb_fetch(tdb, key, &d) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 1);
+               tap_log_messages = 0;
+
+               /* Nonblocking delete. */
+               lock_err = EAGAIN;
+               ok1(tdb_delete(tdb, key) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = EINTR;
+               ok1(tdb_delete(tdb, key) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = ENOMEM;
+               ok1(tdb_delete(tdb, key) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 1);
+               tap_log_messages = 0;
+
+               /* Nonblocking locks. */
+               lock_err = EAGAIN;
+               ok1(tdb_chainlock(tdb, key) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = EINTR;
+               ok1(tdb_chainlock(tdb, key) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = ENOMEM;
+               ok1(tdb_chainlock(tdb, key) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 1);
+               tap_log_messages = 0;
+
+               lock_err = EAGAIN;
+               ok1(tdb_chainlock_read(tdb, key) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = EINTR;
+               ok1(tdb_chainlock_read(tdb, key) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = ENOMEM;
+               ok1(tdb_chainlock_read(tdb, key) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 1);
+               tap_log_messages = 0;
+
+               lock_err = EAGAIN;
+               ok1(tdb_lockall(tdb) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = EINTR;
+               ok1(tdb_lockall(tdb) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = ENOMEM;
+               ok1(tdb_lockall(tdb) == TDB_ERR_LOCK);
+               /* This actually does divide and conquer. */
+               ok1(tap_log_messages > 0);
+               tap_log_messages = 0;
+
+               lock_err = EAGAIN;
+               ok1(tdb_lockall_read(tdb) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = EINTR;
+               ok1(tdb_lockall_read(tdb) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = ENOMEM;
+               ok1(tdb_lockall_read(tdb) == TDB_ERR_LOCK);
+               ok1(tap_log_messages > 0);
+               tap_log_messages = 0;
+
+               /* Nonblocking traverse; go nonblock partway through. */
+               lock_err = 0;
+               ok1(tdb_store(tdb, key, data, TDB_REPLACE) == 0);
+               trav_err = EAGAIN;
+               ok1(tdb_traverse(tdb, trav, &lock_err) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               trav_err = EINTR;
+               lock_err = 0;
+               ok1(tdb_traverse(tdb, trav, &lock_err) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               trav_err = ENOMEM;
+               lock_err = 0;
+               ok1(tdb_traverse(tdb, trav, &lock_err) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 1);
+               tap_log_messages = 0;
+
+               /* Nonblocking transactions. */
+               lock_err = EAGAIN;
+               ok1(tdb_transaction_start(tdb) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = EINTR;
+               ok1(tdb_transaction_start(tdb) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+               lock_err = ENOMEM;
+               ok1(tdb_transaction_start(tdb) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 1);
+               tap_log_messages = 0;
+
+               /* Nonblocking transaction prepare. */
+               lock_err = 0;
+               ok1(tdb_transaction_start(tdb) == 0);
+               ok1(tdb_delete(tdb, key) == 0);
+
+               lock_err = EAGAIN;
+               ok1(tdb_transaction_prepare_commit(tdb) == TDB_ERR_LOCK);
+               ok1(tap_log_messages == 0);
+
+               lock_err = 0;
+               ok1(tdb_transaction_prepare_commit(tdb) == 0);
+               ok1(tdb_transaction_commit(tdb) == 0);
+
+               /* And the transaction was committed, right? */
+               ok1(!tdb_exists(tdb, key));
+               tdb_close(tdb);
+               ok1(tap_log_messages == 0);
+       }
+       return exit_status();
+}