]> git.ozlabs.org Git - ccan/blobdiff - ccan/tdb2/io.c
tdb2: add TDB_RDONLY flag, allow setting/unsetting it.
[ccan] / ccan / tdb2 / io.c
index cdee88aa1fd57321220d121a5bfe381332dc9269..24c70084e9719a513cafae66be5ed30d3c7dcbf6 100644 (file)
@@ -1,4 +1,4 @@
- /* 
+ /*
    Unix SMB/CIFS implementation.
 
    trivial database library
    License along with this library; if not, see <http://www.gnu.org/licenses/>.
 */
 #include "private.h"
+#include <assert.h>
 #include <ccan/likely/likely.h>
 
-void tdb_munmap(struct tdb_context *tdb)
+void tdb_munmap(struct tdb_file *file)
 {
-       if (tdb->flags & TDB_INTERNAL)
+       if (file->fd == -1)
                return;
 
-       if (tdb->map_ptr) {
-               munmap(tdb->map_ptr, tdb->map_size);
-               tdb->map_ptr = NULL;
+       if (file->map_ptr) {
+               munmap(file->map_ptr, file->map_size);
+               file->map_ptr = NULL;
        }
 }
 
 void tdb_mmap(struct tdb_context *tdb)
 {
+       int mmap_flags;
+
        if (tdb->flags & TDB_INTERNAL)
                return;
 
        if (tdb->flags & TDB_NOMMAP)
                return;
 
-       tdb->map_ptr = mmap(NULL, tdb->map_size, 
-                           PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
-                           MAP_SHARED, tdb->fd, 0);
+       if ((tdb->open_flags & O_ACCMODE) == O_RDONLY)
+               mmap_flags = PROT_READ;
+       else
+               mmap_flags = PROT_READ | PROT_WRITE;
+
+       /* size_t can be smaller than off_t. */
+       if ((size_t)tdb->file->map_size == tdb->file->map_size) {
+               tdb->file->map_ptr = mmap(NULL, tdb->file->map_size,
+                                         mmap_flags,
+                                         MAP_SHARED, tdb->file->fd, 0);
+       } else
+               tdb->file->map_ptr = MAP_FAILED;
 
        /*
         * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
         */
-       if (tdb->map_ptr == MAP_FAILED) {
-               tdb->map_ptr = NULL;
-               tdb->log(tdb, TDB_DEBUG_WARNING, tdb->log_priv,
-                        "tdb_mmap failed for size %lld (%s)\n", 
-                        (long long)tdb->map_size, strerror(errno));
+       if (tdb->file->map_ptr == MAP_FAILED) {
+               tdb->file->map_ptr = NULL;
+               tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
+                          "tdb_mmap failed for size %lld (%s)",
+                          (long long)tdb->file->map_size, strerror(errno));
        }
 }
 
 /* check for an out of bounds access - if it is out of bounds then
    see if the database has been expanded by someone else and expand
-   if necessary 
-   note that "len" is the minimum length needed for the db
+   if necessary
+   note that "len" is the minimum length needed for the db.
+
+   If probe is true, len being too large isn't a failure.
 */
-static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
+static enum TDB_ERROR tdb_oob(struct tdb_context *tdb, tdb_off_t len,
+                             bool probe)
 {
        struct stat st;
-       if (len <= tdb->map_size)
-               return 0;
+       enum TDB_ERROR ecode;
+
+       /* We can't hold pointers during this: we could unmap! */
+       assert(!tdb->direct_access
+              || (tdb->flags & TDB_NOLOCK)
+              || tdb_has_expansion_lock(tdb));
+
+       if (len <= tdb->file->map_size)
+               return TDB_SUCCESS;
        if (tdb->flags & TDB_INTERNAL) {
-               if (!probe) {
-                       /* Ensure ecode is set for log fn. */
-                       tdb->ecode = TDB_ERR_IO;
-                       tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                                "tdb_oob len %lld beyond internal"
-                                " malloc size %lld\n",
-                                (long long)len,
-                                (long long)tdb->map_size);
-               }
-               return -1;
+               if (probe)
+                       return TDB_SUCCESS;
+
+               tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
+                          "tdb_oob len %lld beyond internal"
+                          " malloc size %lld",
+                          (long long)len,
+                          (long long)tdb->file->map_size);
+               return TDB_ERR_IO;
        }
 
-       if (fstat(tdb->fd, &st) == -1) {
-               tdb->ecode = TDB_ERR_IO;
-               return -1;
+       ecode = tdb_lock_expand(tdb, F_RDLCK);
+       if (ecode != TDB_SUCCESS) {
+               return ecode;
        }
 
-       if (st.st_size < (size_t)len) {
-               if (!probe) {
-                       /* Ensure ecode is set for log fn. */
-                       tdb->ecode = TDB_ERR_IO;
-                       tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                                "tdb_oob len %lld beyond eof at %lld\n",
-                                (long long)len, (long long)st.st_size);
-               }
-               return -1;
+       if (fstat(tdb->file->fd, &st) != 0) {
+               tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
+                          "Failed to fstat file: %s", strerror(errno));
+               tdb_unlock_expand(tdb, F_RDLCK);
+               return TDB_ERR_IO;
        }
 
-       /* Unmap, update size, remap */
-       tdb_munmap(tdb);
-       tdb->map_size = st.st_size;
-       tdb_mmap(tdb);
-       return 0;
-}
-
-static void *tdb_direct(struct tdb_context *tdb, tdb_off_t off, size_t len)
-{
-       if (unlikely(!tdb->map_ptr))
-               return NULL;
-
-       /* FIXME: We can do a subset of this! */
-       if (tdb->transaction)
-               return NULL;
+       tdb_unlock_expand(tdb, F_RDLCK);
 
-       if (unlikely(tdb_oob(tdb, off + len, true) == -1))
-               return NULL;
-       return (char *)tdb->map_ptr + off;
-}
+       if (st.st_size < (size_t)len) {
+               if (probe)
+                       return TDB_SUCCESS;
 
-/* Either make a copy into pad and return that, or return ptr into mmap. */
-/* Note: pad has to be a real object, so we can't get here if len
- * overflows size_t */
-void *tdb_get(struct tdb_context *tdb, tdb_off_t off, void *pad, size_t len)
-{
-       if (likely(!(tdb->flags & TDB_CONVERT))) {
-               void *ret = tdb_direct(tdb, off, len);
-               if (ret)
-                       return ret;
+               tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
+                          "tdb_oob len %zu beyond eof at %zu",
+                          (size_t)len, st.st_size);
+               return TDB_ERR_IO;
        }
 
-       if (unlikely(tdb_oob(tdb, off + len, false) == -1))
-               return NULL;
+       /* Unmap, update size, remap */
+       tdb_munmap(tdb->file);
 
-       if (tdb->methods->read(tdb, off, pad, len) == -1)
-               return NULL;
-       return tdb_convert(tdb, pad, len);
+       tdb->file->map_size = st.st_size;
+       tdb_mmap(tdb);
+       return TDB_SUCCESS;
 }
 
 /* Endian conversion: we only ever deal with 8 byte quantities */
 void *tdb_convert(const struct tdb_context *tdb, void *buf, tdb_len_t size)
 {
-       if (unlikely((tdb->flags & TDB_CONVERT))) {
+       assert(size % 8 == 0);
+       if (unlikely((tdb->flags & TDB_CONVERT)) && buf) {
                uint64_t i, *p = (uint64_t *)buf;
                for (i = 0; i < size / 8; i++)
                        p[i] = bswap_64(p[i]);
@@ -152,468 +150,471 @@ void *tdb_convert(const struct tdb_context *tdb, void *buf, tdb_len_t size)
        return buf;
 }
 
-/* Return first non-zero offset in num offset array, or num. */
+/* Return first non-zero offset in offset array, or end, or -ve error. */
 /* FIXME: Return the off? */
-uint64_t tdb_find_nonzero_off(struct tdb_context *tdb, tdb_off_t off,
-                             uint64_t num)
+uint64_t tdb_find_nonzero_off(struct tdb_context *tdb,
+                             tdb_off_t base, uint64_t start, uint64_t end)
 {
-       uint64_t i, *val;
-       bool alloc = false;
+       uint64_t i;
+       const uint64_t *val;
 
-       val = tdb_direct(tdb, off, num * sizeof(tdb_off_t));
-       if (!unlikely(val)) {
-               val = tdb_alloc_read(tdb, off, num * sizeof(tdb_off_t));
-               if (!val)
-                       return num;
-               alloc = true;
+       /* Zero vs non-zero is the same unconverted: minor optimization. */
+       val = tdb_access_read(tdb, base + start * sizeof(tdb_off_t),
+                             (end - start) * sizeof(tdb_off_t), false);
+       if (TDB_PTR_IS_ERR(val)) {
+               return TDB_PTR_ERR(val);
        }
 
-       for (i = 0; i < num; i++) {
+       for (i = 0; i < (end - start); i++) {
                if (val[i])
                        break;
        }
-       if (unlikely(alloc))
-               free(val);
-       return i;
+       tdb_access_release(tdb, val);
+       return start + i;
 }
 
-/* Return first zero offset in num offset array, or num. */
+/* Return first zero offset in num offset array, or num, or -ve error. */
 uint64_t tdb_find_zero_off(struct tdb_context *tdb, tdb_off_t off,
                           uint64_t num)
 {
-       uint64_t i, *val;
-       bool alloc = false;
+       uint64_t i;
+       const uint64_t *val;
 
-       val = tdb_direct(tdb, off, num * sizeof(tdb_off_t));
-       if (!unlikely(val)) {
-               val = tdb_alloc_read(tdb, off, num * sizeof(tdb_off_t));
-               if (!val)
-                       return num;
-               alloc = true;
+       /* Zero vs non-zero is the same unconverted: minor optimization. */
+       val = tdb_access_read(tdb, off, num * sizeof(tdb_off_t), false);
+       if (TDB_PTR_IS_ERR(val)) {
+               return TDB_PTR_ERR(val);
        }
 
        for (i = 0; i < num; i++) {
                if (!val[i])
                        break;
        }
-       if (unlikely(alloc))
-               free(val);
+       tdb_access_release(tdb, val);
        return i;
 }
 
-static int fill(struct tdb_context *tdb,
-               const void *buf, size_t size,
-               tdb_off_t off, tdb_len_t len)
+enum TDB_ERROR zero_out(struct tdb_context *tdb, tdb_off_t off, tdb_len_t len)
 {
-       while (len) {
-               size_t n = len > size ? size : len;
+       char buf[8192] = { 0 };
+       void *p = tdb->methods->direct(tdb, off, len, true);
+       enum TDB_ERROR ecode = TDB_SUCCESS;
 
-               if (!tdb_pwrite_all(tdb->fd, buf, n, off)) {
-                       tdb->ecode = TDB_ERR_IO;
-                       tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                                "fill write failed: giving up!\n");
-                       return -1;
-               }
-               len -= n;
-               off += n;
+       assert(!(tdb->flags & TDB_RDONLY));
+       if (TDB_PTR_IS_ERR(p)) {
+               return TDB_PTR_ERR(p);
        }
-       return 0;
-}
-
-int zero_out(struct tdb_context *tdb, tdb_off_t off, tdb_len_t len)
-{
-       void *p = tdb_direct(tdb, off, len);
        if (p) {
                memset(p, 0, len);
-               return 0;
-       } else {
-               char buf[8192] = { 0 };
-               return fill(tdb, buf, sizeof(buf), off, len);
+               return ecode;
        }
-}
-
-tdb_off_t tdb_read_off(struct tdb_context *tdb, tdb_off_t off)
-{
-       tdb_off_t pad, *ret;
-
-       ret = tdb_get(tdb, off, &pad, sizeof(pad));
-       if (!ret) {
-               return TDB_OFF_ERR;
-       }
-       return *ret;
-}
-
-/* Even on files, we can get partial writes due to signals. */
-bool tdb_pwrite_all(int fd, const void *buf, size_t len, tdb_off_t off)
-{
        while (len) {
-               ssize_t ret;
-               ret = pwrite(fd, buf, len, off);
-               if (ret < 0)
-                       return false;
-               if (ret == 0) {
-                       errno = ENOSPC;
-                       return false;
+               unsigned todo = len < sizeof(buf) ? len : sizeof(buf);
+               ecode = tdb->methods->twrite(tdb, off, buf, todo);
+               if (ecode != TDB_SUCCESS) {
+                       break;
                }
-               buf = (char *)buf + ret;
-               off += ret;
-               len -= ret;
+               len -= todo;
+               off += todo;
        }
-       return true;
+       return ecode;
 }
 
-/* Even on files, we can get partial reads due to signals. */
-bool tdb_pread_all(int fd, void *buf, size_t len, tdb_off_t off)
+tdb_off_t tdb_read_off(struct tdb_context *tdb, tdb_off_t off)
 {
-       while (len) {
-               ssize_t ret;
-               ret = pread(fd, buf, len, off);
-               if (ret < 0)
-                       return false;
-               if (ret == 0) {
-                       /* ETOOSHORT? */
-                       errno = EWOULDBLOCK;
-                       return false;
+       tdb_off_t ret;
+       enum TDB_ERROR ecode;
+
+       if (likely(!(tdb->flags & TDB_CONVERT))) {
+               tdb_off_t *p = tdb->methods->direct(tdb, off, sizeof(*p),
+                                                   false);
+               if (TDB_PTR_IS_ERR(p)) {
+                       return TDB_PTR_ERR(p);
                }
-               buf = (char *)buf + ret;
-               off += ret;
-               len -= ret;
+               if (p)
+                       return *p;
        }
-       return true;
-}
 
-bool tdb_read_all(int fd, void *buf, size_t len)
-{
-       while (len) {
-               ssize_t ret;
-               ret = read(fd, buf, len);
-               if (ret < 0)
-                       return false;
-               if (ret == 0) {
-                       /* ETOOSHORT? */
-                       errno = EWOULDBLOCK;
-                       return false;
-               }
-               buf = (char *)buf + ret;
-               len -= ret;
+       ecode = tdb_read_convert(tdb, off, &ret, sizeof(ret));
+       if (ecode != TDB_SUCCESS) {
+               return ecode;
        }
-       return true;
+       return ret;
 }
 
 /* write a lump of data at a specified offset */
-static int tdb_write(struct tdb_context *tdb, tdb_off_t off, 
-                    const void *buf, tdb_len_t len)
+static enum TDB_ERROR tdb_write(struct tdb_context *tdb, tdb_off_t off,
+                               const void *buf, tdb_len_t len)
 {
-       if (len == 0) {
-               return 0;
-       }
+       enum TDB_ERROR ecode;
 
-       if (tdb->read_only) {
-               tdb->ecode = TDB_ERR_RDONLY;
-               return -1;
+       if (tdb->flags & TDB_RDONLY) {
+               return tdb_logerr(tdb, TDB_ERR_RDONLY, TDB_LOG_USE_ERROR,
+                                 "Write to read-only database");
        }
 
-       if (tdb->methods->oob(tdb, off + len, 0) != 0)
-               return -1;
+       ecode = tdb->methods->oob(tdb, off + len, false);
+       if (ecode != TDB_SUCCESS) {
+               return ecode;
+       }
 
-       if (tdb->map_ptr) {
-               memcpy(off + (char *)tdb->map_ptr, buf, len);
+       if (tdb->file->map_ptr) {
+               memcpy(off + (char *)tdb->file->map_ptr, buf, len);
        } else {
-               if (!tdb_pwrite_all(tdb->fd, buf, len, off)) {
-                       tdb->ecode = TDB_ERR_IO;
-                       tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                                "tdb_write failed at %llu len=%llu (%s)\n",
-                                off, len, strerror(errno));
-                       return -1;
+               ssize_t ret;
+               ret = pwrite(tdb->file->fd, buf, len, off);
+               if (ret != len) {
+                       /* This shouldn't happen: we avoid sparse files. */
+                       if (ret >= 0)
+                               errno = ENOSPC;
+
+                       return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
+                                         "tdb_write: %zi at %zu len=%zu (%s)",
+                                         ret, (size_t)off, (size_t)len,
+                                         strerror(errno));
                }
        }
-       return 0;
+       return TDB_SUCCESS;
 }
 
 /* read a lump of data at a specified offset */
-static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
-                   tdb_len_t len)
+static enum TDB_ERROR tdb_read(struct tdb_context *tdb, tdb_off_t off,
+                              void *buf, tdb_len_t len)
 {
-       if (tdb->methods->oob(tdb, off + len, 0) != 0) {
-               return -1;
+       enum TDB_ERROR ecode;
+
+       ecode = tdb->methods->oob(tdb, off + len, false);
+       if (ecode != TDB_SUCCESS) {
+               return ecode;
        }
 
-       if (tdb->map_ptr) {
-               memcpy(buf, off + (char *)tdb->map_ptr, len);
+       if (tdb->file->map_ptr) {
+               memcpy(buf, off + (char *)tdb->file->map_ptr, len);
        } else {
-               if (!tdb_pread_all(tdb->fd, buf, len, off)) {
-                       /* Ensure ecode is set for log fn. */
-                       tdb->ecode = TDB_ERR_IO;
-                       tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
-                                "tdb_read failed at %lld "
-                                "len=%lld (%s) map_size=%lld\n",
-                                (long long)off, (long long)len,
-                                strerror(errno),
-                                (long long)tdb->map_size);
-                       return -1;
+               ssize_t r = pread(tdb->file->fd, buf, len, off);
+               if (r != len) {
+                       return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
+                                         "tdb_read failed with %zi at %zu "
+                                         "len=%zu (%s) map_size=%zu",
+                                         r, (size_t)off, (size_t)len,
+                                         strerror(errno),
+                                         (size_t)tdb->file->map_size);
                }
        }
-       return 0;
+       return TDB_SUCCESS;
 }
 
-int tdb_write_convert(struct tdb_context *tdb, tdb_off_t off,
-                     void *rec, size_t len)
+enum TDB_ERROR tdb_write_convert(struct tdb_context *tdb, tdb_off_t off,
+                                const void *rec, size_t len)
 {
-       return tdb->methods->write(tdb, off, tdb_convert(tdb, rec, len), len);
+       enum TDB_ERROR ecode;
+
+       if (unlikely((tdb->flags & TDB_CONVERT))) {
+               void *conv = malloc(len);
+               if (!conv) {
+                       return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
+                                         "tdb_write: no memory converting"
+                                         " %zu bytes", len);
+               }
+               memcpy(conv, rec, len);
+               ecode = tdb->methods->twrite(tdb, off,
+                                          tdb_convert(tdb, conv, len), len);
+               free(conv);
+       } else {
+               ecode = tdb->methods->twrite(tdb, off, rec, len);
+       }
+       return ecode;
 }
 
-int tdb_read_convert(struct tdb_context *tdb, tdb_off_t off,
-                     void *rec, size_t len)
+enum TDB_ERROR tdb_read_convert(struct tdb_context *tdb, tdb_off_t off,
+                               void *rec, size_t len)
 {
-       int ret = tdb->methods->read(tdb, off, rec, len);
+       enum TDB_ERROR ecode = tdb->methods->tread(tdb, off, rec, len);
        tdb_convert(tdb, rec, len);
-       return ret;
+       return ecode;
 }
 
-int tdb_write_off(struct tdb_context *tdb, tdb_off_t off, tdb_off_t val)
+enum TDB_ERROR tdb_write_off(struct tdb_context *tdb,
+                            tdb_off_t off, tdb_off_t val)
 {
+       if (tdb->flags & TDB_RDONLY) {
+               return tdb_logerr(tdb, TDB_ERR_RDONLY, TDB_LOG_USE_ERROR,
+                                 "Write to read-only database");
+       }
+
+       if (likely(!(tdb->flags & TDB_CONVERT))) {
+               tdb_off_t *p = tdb->methods->direct(tdb, off, sizeof(*p),
+                                                   true);
+               if (TDB_PTR_IS_ERR(p)) {
+                       return TDB_PTR_ERR(p);
+               }
+               if (p) {
+                       *p = val;
+                       return TDB_SUCCESS;
+               }
+       }
        return tdb_write_convert(tdb, off, &val, sizeof(val));
 }
 
-/* read a lump of data, allocating the space for it */
-void *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
+static void *_tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset,
+                            tdb_len_t len, unsigned int prefix)
 {
-       void *buf;
+       unsigned char *buf;
+       enum TDB_ERROR ecode;
 
        /* some systems don't like zero length malloc */
-       buf = malloc(len ? len : 1);
-       if (unlikely(!buf)) {
-               tdb->ecode = TDB_ERR_OOM;
-               tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
-                        "tdb_alloc_read malloc failed len=%lld\n",
-                        (long long)len);
-       } else if (unlikely(tdb->methods->read(tdb, offset, buf, len))) {
-               free(buf);
-               buf = NULL;
+       buf = malloc(prefix + len ? prefix + len : 1);
+       if (!buf) {
+               tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_USE_ERROR,
+                          "tdb_alloc_read malloc failed len=%zu",
+                          (size_t)(prefix + len));
+               return TDB_ERR_PTR(TDB_ERR_OOM);
+       } else {
+               ecode = tdb->methods->tread(tdb, offset, buf+prefix, len);
+               if (unlikely(ecode != TDB_SUCCESS)) {
+                       free(buf);
+                       return TDB_ERR_PTR(ecode);
+               }
        }
        return buf;
 }
 
-uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
+/* read a lump of data, allocating the space for it */
+void *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
 {
-       struct tdb_used_record pad, *r;
-       void *key;
-       uint64_t klen, hash;
-
-       r = tdb_get(tdb, off, &pad, sizeof(pad));
-       if (!r)
-               /* FIXME */
-               return 0;
-
-       klen = rec_key_length(r);
-       key = tdb_direct(tdb, off + sizeof(pad), klen);
-       if (likely(key))
-               return tdb_hash(tdb, key, klen);
-
-       key = tdb_alloc_read(tdb, off + sizeof(pad), klen);
-       if (unlikely(!key))
-               return 0;
-       hash = tdb_hash(tdb, key, klen);
-       free(key);
-       return hash;
+       return _tdb_alloc_read(tdb, offset, len, 0);
 }
 
-/* Give a piece of tdb data to a parser */
-int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
-                  tdb_off_t offset, tdb_len_t len,
-                  int (*parser)(TDB_DATA key, TDB_DATA data,
-                                void *private_data),
-                  void *private_data)
+static enum TDB_ERROR fill(struct tdb_context *tdb,
+                          const void *buf, size_t size,
+                          tdb_off_t off, tdb_len_t len)
 {
-       TDB_DATA data;
-       int result;
-       bool allocated = false;
-
-       data.dsize = len;
-       data.dptr = tdb_direct(tdb, offset, len);
-       if (unlikely(!data.dptr)) {
-               if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
-                       return -1;
+       while (len) {
+               size_t n = len > size ? size : len;
+               ssize_t ret = pwrite(tdb->file->fd, buf, n, off);
+               if (ret != n) {
+                       if (ret >= 0)
+                               errno = ENOSPC;
+
+                       return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
+                                         "fill failed:"
+                                         " %zi at %zu len=%zu (%s)",
+                                         ret, (size_t)off, (size_t)len,
+                                         strerror(errno));
                }
-               allocated = true;
+               len -= n;
+               off += n;
        }
-       result = parser(key, data, private_data);
-       if (unlikely(allocated))
-               free(data.dptr);
-       return result;
+       return TDB_SUCCESS;
 }
 
 /* expand a file.  we prefer to use ftruncate, as that is what posix
   says to use for mmap expansion */
-static int tdb_expand_file(struct tdb_context *tdb, tdb_len_t addition)
+static enum TDB_ERROR tdb_expand_file(struct tdb_context *tdb,
+                                     tdb_len_t addition)
 {
        char buf[8192];
+       enum TDB_ERROR ecode;
 
-       if (tdb->read_only) {
-               tdb->ecode = TDB_ERR_RDONLY;
-               return -1;
+       if (tdb->flags & TDB_RDONLY) {
+               return tdb_logerr(tdb, TDB_ERR_RDONLY, TDB_LOG_USE_ERROR,
+                                 "Expand on read-only database");
        }
 
        if (tdb->flags & TDB_INTERNAL) {
-               char *new = realloc(tdb->map_ptr, tdb->map_size + addition);
+               char *new = realloc(tdb->file->map_ptr,
+                                   tdb->file->map_size + addition);
                if (!new) {
-                       tdb->ecode = TDB_ERR_OOM;
-                       return -1;
+                       return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
+                                         "No memory to expand database");
                }
-               tdb->map_ptr = new;
-               tdb->map_size += addition;
+               tdb->file->map_ptr = new;
+               tdb->file->map_size += addition;
        } else {
                /* Unmap before trying to write; old TDB claimed OpenBSD had
                 * problem with this otherwise. */
-               tdb_munmap(tdb);
+               tdb_munmap(tdb->file);
 
                /* If this fails, we try to fill anyway. */
-               if (ftruncate(tdb->fd, tdb->map_size + addition))
+               if (ftruncate(tdb->file->fd, tdb->file->map_size + addition))
                        ;
 
                /* now fill the file with something. This ensures that the
                   file isn't sparse, which would be very bad if we ran out of
                   disk. This must be done with write, not via mmap */
                memset(buf, 0x43, sizeof(buf));
-               if (fill(tdb, buf, sizeof(buf), tdb->map_size, addition) == -1)
-                       return -1;
-               tdb->map_size += addition;
+               ecode = fill(tdb, buf, sizeof(buf), tdb->file->map_size,
+                            addition);
+               if (ecode != TDB_SUCCESS)
+                       return ecode;
+               tdb->file->map_size += addition;
                tdb_mmap(tdb);
        }
-       return 0;
+       return TDB_SUCCESS;
 }
 
 const void *tdb_access_read(struct tdb_context *tdb,
-                           tdb_off_t off, tdb_len_t len)
+                           tdb_off_t off, tdb_len_t len, bool convert)
 {
-       const void *ret = tdb_direct(tdb, off, len);
+       void *ret = NULL;
 
-       if (!ret)
-               ret = tdb_alloc_read(tdb, off, len);
-       return ret;
-}
+       if (likely(!(tdb->flags & TDB_CONVERT))) {
+               ret = tdb->methods->direct(tdb, off, len, false);
 
-void tdb_access_release(struct tdb_context *tdb, const void *p)
-{
-       if (!tdb->map_ptr
-           || (char *)p < (char *)tdb->map_ptr
-           || (char *)p >= (char *)tdb->map_ptr + tdb->map_size)
-               free((void *)p);
+               if (TDB_PTR_IS_ERR(ret)) {
+                       return ret;
+               }
+       }
+       if (!ret) {
+               struct tdb_access_hdr *hdr;
+               hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr));
+               if (TDB_PTR_IS_ERR(hdr)) {
+                       return hdr;
+               }
+               hdr->next = tdb->access;
+               tdb->access = hdr;
+               ret = hdr + 1;
+               if (convert) {
+                       tdb_convert(tdb, (void *)ret, len);
+               }
+       } else
+               tdb->direct_access++;
+
+       return ret;
 }
 
-#if 0
-/* write a lump of data at a specified offset */
-static int tdb_write(struct tdb_context *tdb, tdb_off_t off, 
-                    const void *buf, tdb_len_t len)
+void *tdb_access_write(struct tdb_context *tdb,
+                      tdb_off_t off, tdb_len_t len, bool convert)
 {
-       if (len == 0) {
-               return 0;
-       }
+       void *ret = NULL;
 
-       if (tdb->read_only || tdb->traverse_read) {
-               tdb->ecode = TDB_ERR_RDONLY;
-               return -1;
+       if (tdb->flags & TDB_RDONLY) {
+               tdb_logerr(tdb, TDB_ERR_RDONLY, TDB_LOG_USE_ERROR,
+                          "Write to read-only database");
+               return TDB_ERR_PTR(TDB_ERR_RDONLY);
        }
 
-       if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
-               return -1;
+       if (likely(!(tdb->flags & TDB_CONVERT))) {
+               ret = tdb->methods->direct(tdb, off, len, true);
 
-       if (tdb->map_ptr) {
-               memcpy(off + (char *)tdb->map_ptr, buf, len);
-       } else {
-               ssize_t written = pwrite(tdb->fd, buf, len, off);
-               if ((written != (ssize_t)len) && (written != -1)) {
-                       /* try once more */
-                       tdb->ecode = TDB_ERR_IO;
-                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_write: wrote only "
-                                "%d of %d bytes at %d, trying once more\n",
-                                (int)written, len, off));
-                       written = pwrite(tdb->fd, (const char *)buf+written,
-                                        len-written,
-                                        off+written);
-               }
-               if (written == -1) {
-                       /* Ensure ecode is set for log fn. */
-                       tdb->ecode = TDB_ERR_IO;
-                       TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d "
-                                "len=%d (%s)\n", off, len, strerror(errno)));
-                       return -1;
-               } else if (written != (ssize_t)len) {
-                       tdb->ecode = TDB_ERR_IO;
-                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_write: failed to "
-                                "write %d bytes at %d in two attempts\n",
-                                len, off));
-                       return -1;
+               if (TDB_PTR_IS_ERR(ret)) {
+                       return ret;
                }
        }
-       return 0;
-}
 
+       if (!ret) {
+               struct tdb_access_hdr *hdr;
+               hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr));
+               if (TDB_PTR_IS_ERR(hdr)) {
+                       return hdr;
+               }
+               hdr->next = tdb->access;
+               tdb->access = hdr;
+               hdr->off = off;
+               hdr->len = len;
+               hdr->convert = convert;
+               ret = hdr + 1;
+               if (convert)
+                       tdb_convert(tdb, (void *)ret, len);
+       } else
+               tdb->direct_access++;
 
+       return ret;
+}
 
-/*
-  do an unlocked scan of the hash table heads to find the next non-zero head. The value
-  will then be confirmed with the lock held
-*/             
-static void tdb_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
+static struct tdb_access_hdr **find_hdr(struct tdb_context *tdb, const void *p)
 {
-       uint32_t h = *chain;
-       if (tdb->map_ptr) {
-               for (;h < tdb->header.hash_size;h++) {
-                       if (0 != *(uint32_t *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
-                               break;
-                       }
-               }
-       } else {
-               uint32_t off=0;
-               for (;h < tdb->header.hash_size;h++) {
-                       if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
-                               break;
-                       }
-               }
+       struct tdb_access_hdr **hp;
+
+       for (hp = &tdb->access; *hp; hp = &(*hp)->next) {
+               if (*hp + 1 == p)
+                       return hp;
        }
-       (*chain) = h;
+       return NULL;
 }
 
-/* read/write a tdb_off_t */
-int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
+void tdb_access_release(struct tdb_context *tdb, const void *p)
 {
-       return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
+       struct tdb_access_hdr *hdr, **hp = find_hdr(tdb, p);
+
+       if (hp) {
+               hdr = *hp;
+               *hp = hdr->next;
+               free(hdr);
+       } else
+               tdb->direct_access--;
 }
 
-int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
+enum TDB_ERROR tdb_access_commit(struct tdb_context *tdb, void *p)
 {
-       tdb_off_t off = *d;
-       return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
-}
+       struct tdb_access_hdr *hdr, **hp = find_hdr(tdb, p);
+       enum TDB_ERROR ecode;
+
+       if (hp) {
+               hdr = *hp;
+               if (hdr->convert)
+                       ecode = tdb_write_convert(tdb, hdr->off, p, hdr->len);
+               else
+                       ecode = tdb_write(tdb, hdr->off, p, hdr->len);
+               *hp = hdr->next;
+               free(hdr);
+       } else {
+               tdb->direct_access--;
+               ecode = TDB_SUCCESS;
+       }
 
+       return ecode;
+}
 
-/* read/write a record */
-int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct tdb_record *rec)
+static void *tdb_direct(struct tdb_context *tdb, tdb_off_t off, size_t len,
+                       bool write_mode)
 {
-       if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
-               return -1;
-       if (TDB_BAD_MAGIC(rec)) {
-               /* Ensure ecode is set for log fn. */
-               tdb->ecode = TDB_ERR_CORRUPT;
-               TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
-               return -1;
-       }
-       return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
+       enum TDB_ERROR ecode;
+
+       if (unlikely(!tdb->file->map_ptr))
+               return NULL;
+
+       ecode = tdb_oob(tdb, off + len, false);
+       if (unlikely(ecode != TDB_SUCCESS))
+               return TDB_ERR_PTR(ecode);
+       return (char *)tdb->file->map_ptr + off;
 }
 
-int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct tdb_record *rec)
+void tdb_inc_seqnum(struct tdb_context *tdb)
 {
-       struct tdb_record r = *rec;
-       return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
+       tdb_off_t seq;
+
+       if (likely(!(tdb->flags & TDB_CONVERT))) {
+               int64_t *direct;
+
+               direct = tdb->methods->direct(tdb,
+                                             offsetof(struct tdb_header,
+                                                      seqnum),
+                                             sizeof(*direct), true);
+               if (likely(direct)) {
+                       /* Don't let it go negative, even briefly */
+                       if (unlikely((*direct) + 1) < 0)
+                               *direct = 0;
+                       (*direct)++;
+                       return;
+               }
+       }
+
+       seq = tdb_read_off(tdb, offsetof(struct tdb_header, seqnum));
+       if (!TDB_OFF_IS_ERR(seq)) {
+               seq++;
+               if (unlikely((int64_t)seq < 0))
+                       seq = 0;
+               tdb_write_off(tdb, offsetof(struct tdb_header, seqnum), seq);
+       }
 }
-#endif
 
 static const struct tdb_methods io_methods = {
        tdb_read,
        tdb_write,
        tdb_oob,
        tdb_expand_file,
+       tdb_direct,
 };
 
 /*