License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/
#include "private.h"
+#include <assert.h>
#include <ccan/likely/likely.h>
void tdb_munmap(struct tdb_context *tdb)
if (tdb->flags & TDB_NOMMAP)
return;
- tdb->map_ptr = mmap(NULL, tdb->map_size,
- PROT_READ|(tdb->read_only? 0:PROT_WRITE),
+ tdb->map_ptr = mmap(NULL, tdb->map_size, tdb->mmap_flags,
MAP_SHARED, tdb->fd, 0);
/*
static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, bool probe)
{
struct stat st;
+ int ret;
+
+ /* We can't hold pointers during this: we could unmap! */
+ assert(!tdb->direct_access
+ || (tdb->flags & TDB_NOLOCK)
+ || tdb_has_expansion_lock(tdb));
+
if (len <= tdb->map_size)
return 0;
if (tdb->flags & TDB_INTERNAL) {
return -1;
}
- if (fstat(tdb->fd, &st) == -1) {
+ if (tdb_lock_expand(tdb, F_RDLCK) != 0)
+ return -1;
+
+ ret = fstat(tdb->fd, &st);
+
+ tdb_unlock_expand(tdb, F_RDLCK);
+
+ if (ret == -1) {
tdb->ecode = TDB_ERR_IO;
return -1;
}
/* Unmap, update size, remap */
tdb_munmap(tdb);
+
tdb->map_size = st.st_size;
tdb_mmap(tdb);
return 0;
}
-static void *tdb_direct(struct tdb_context *tdb, tdb_off_t off, size_t len)
-{
- if (unlikely(!tdb->map_ptr))
- return NULL;
-
- /* FIXME: We can do a subset of this! */
- if (tdb->transaction)
- return NULL;
-
- if (unlikely(tdb_oob(tdb, off + len, true) == -1))
- return NULL;
- return (char *)tdb->map_ptr + off;
-}
-
-/* Either make a copy into pad and return that, or return ptr into mmap. */
-/* Note: pad has to be a real object, so we can't get here if len
- * overflows size_t */
-void *tdb_get(struct tdb_context *tdb, tdb_off_t off, void *pad, size_t len)
-{
- if (likely(!(tdb->flags & TDB_CONVERT))) {
- void *ret = tdb_direct(tdb, off, len);
- if (ret)
- return ret;
- }
-
- if (unlikely(tdb_oob(tdb, off + len, false) == -1))
- return NULL;
-
- if (tdb->methods->read(tdb, off, pad, len) == -1)
- return NULL;
- return tdb_convert(tdb, pad, len);
-}
-
/* Endian conversion: we only ever deal with 8 byte quantities */
void *tdb_convert(const struct tdb_context *tdb, void *buf, tdb_len_t size)
{
- if (unlikely((tdb->flags & TDB_CONVERT))) {
+ if (unlikely((tdb->flags & TDB_CONVERT)) && buf) {
uint64_t i, *p = (uint64_t *)buf;
for (i = 0; i < size / 8; i++)
p[i] = bswap_64(p[i]);
return buf;
}
-/* Return first non-zero offset in num offset array, or num. */
/* FIXME: Return the off? */
-uint64_t tdb_find_nonzero_off(struct tdb_context *tdb, tdb_off_t off,
- uint64_t num)
+uint64_t tdb_find_nonzero_off(struct tdb_context *tdb,
+ tdb_off_t base, uint64_t start, uint64_t end)
{
- uint64_t i, *val;
- bool alloc = false;
-
- val = tdb_direct(tdb, off, num * sizeof(tdb_off_t));
- if (!unlikely(val)) {
- val = tdb_alloc_read(tdb, off, num * sizeof(tdb_off_t));
- if (!val)
- return num;
- alloc = true;
- }
+ uint64_t i;
+ const uint64_t *val;
- for (i = 0; i < num; i++) {
+ /* Zero vs non-zero is the same unconverted: minor optimization. */
+ val = tdb_access_read(tdb, base + start * sizeof(tdb_off_t),
+ (end - start) * sizeof(tdb_off_t), false);
+ if (!val)
+ return end;
+
+ for (i = 0; i < (end - start); i++) {
if (val[i])
break;
}
- if (unlikely(alloc))
- free(val);
- return i;
+ tdb_access_release(tdb, val);
+ return start + i;
}
/* Return first zero offset in num offset array, or num. */
uint64_t tdb_find_zero_off(struct tdb_context *tdb, tdb_off_t off,
uint64_t num)
{
- uint64_t i, *val;
- bool alloc = false;
-
- val = tdb_direct(tdb, off, num * sizeof(tdb_off_t));
- if (!unlikely(val)) {
- val = tdb_alloc_read(tdb, off, num * sizeof(tdb_off_t));
- if (!val)
- return num;
- alloc = true;
- }
+ uint64_t i;
+ const uint64_t *val;
+
+ /* Zero vs non-zero is the same unconverted: minor optimization. */
+ val = tdb_access_read(tdb, off, num * sizeof(tdb_off_t), false);
+ if (!val)
+ return num;
for (i = 0; i < num; i++) {
if (!val[i])
break;
}
- if (unlikely(alloc))
- free(val);
+ tdb_access_release(tdb, val);
return i;
}
-static int fill(struct tdb_context *tdb,
- const void *buf, size_t size,
- tdb_off_t off, tdb_len_t len)
+int zero_out(struct tdb_context *tdb, tdb_off_t off, tdb_len_t len)
{
- while (len) {
- size_t n = len > size ? size : len;
+ char buf[8192] = { 0 };
+ void *p = tdb->methods->direct(tdb, off, len);
- if (!tdb_pwrite_all(tdb->fd, buf, n, off)) {
- tdb->ecode = TDB_ERR_IO;
- tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
- "fill write failed: giving up!\n");
- return -1;
- }
- len -= n;
- off += n;
+ if (tdb->read_only) {
+ tdb->ecode = TDB_ERR_RDONLY;
+ return -1;
}
- return 0;
-}
-int zero_out(struct tdb_context *tdb, tdb_off_t off, tdb_len_t len)
-{
- void *p = tdb_direct(tdb, off, len);
if (p) {
memset(p, 0, len);
return 0;
- } else {
- char buf[8192] = { 0 };
- return fill(tdb, buf, sizeof(buf), off, len);
}
+ while (len) {
+ unsigned todo = len < sizeof(buf) ? len : sizeof(buf);
+ if (tdb->methods->write(tdb, off, buf, todo) == -1)
+ return -1;
+ len -= todo;
+ off += todo;
+ }
+ return 0;
}
tdb_off_t tdb_read_off(struct tdb_context *tdb, tdb_off_t off)
{
- tdb_off_t pad, *ret;
+ tdb_off_t ret;
- ret = tdb_get(tdb, off, &pad, sizeof(pad));
- if (!ret) {
- return TDB_OFF_ERR;
+ if (likely(!(tdb->flags & TDB_CONVERT))) {
+ tdb_off_t *p = tdb->methods->direct(tdb, off, sizeof(*p));
+ if (p)
+ return *p;
}
- return *ret;
+
+ if (tdb_read_convert(tdb, off, &ret, sizeof(ret)) == -1)
+ return TDB_OFF_ERR;
+ return ret;
}
/* Even on files, we can get partial writes due to signals. */
tdb->ecode = TDB_ERR_IO;
tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
"tdb_write failed at %llu len=%llu (%s)\n",
- off, len, strerror(errno));
+ (long long)off, (long long)len,
+ strerror(errno));
return -1;
}
}
}
int tdb_write_convert(struct tdb_context *tdb, tdb_off_t off,
- void *rec, size_t len)
+ const void *rec, size_t len)
{
- return tdb->methods->write(tdb, off, tdb_convert(tdb, rec, len), len);
+ int ret;
+ if (unlikely((tdb->flags & TDB_CONVERT))) {
+ void *conv = malloc(len);
+ if (!conv) {
+ tdb->ecode = TDB_ERR_OOM;
+ tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
+ "tdb_write: no memory converting %zu bytes\n",
+ len);
+ return -1;
+ }
+ memcpy(conv, rec, len);
+ ret = tdb->methods->write(tdb, off,
+ tdb_convert(tdb, conv, len), len);
+ free(conv);
+ } else
+ ret = tdb->methods->write(tdb, off, rec, len);
+
+ return ret;
}
int tdb_read_convert(struct tdb_context *tdb, tdb_off_t off,
int tdb_write_off(struct tdb_context *tdb, tdb_off_t off, tdb_off_t val)
{
+ if (tdb->read_only) {
+ tdb->ecode = TDB_ERR_RDONLY;
+ return -1;
+ }
+
+ if (likely(!(tdb->flags & TDB_CONVERT))) {
+ tdb_off_t *p = tdb->methods->direct(tdb, off, sizeof(*p));
+ if (p) {
+ *p = val;
+ return 0;
+ }
+ }
return tdb_write_convert(tdb, off, &val, sizeof(val));
}
-/* read a lump of data, allocating the space for it */
-void *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
+static void *_tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset,
+ tdb_len_t len, unsigned int prefix)
{
void *buf;
/* some systems don't like zero length malloc */
- buf = malloc(len ? len : 1);
+ buf = malloc(prefix + len ? prefix + len : 1);
if (unlikely(!buf)) {
tdb->ecode = TDB_ERR_OOM;
tdb->log(tdb, TDB_DEBUG_ERROR, tdb->log_priv,
"tdb_alloc_read malloc failed len=%lld\n",
- (long long)len);
- } else if (unlikely(tdb->methods->read(tdb, offset, buf, len))) {
+ (long long)prefix + len);
+ } else if (unlikely(tdb->methods->read(tdb, offset, buf+prefix, len))) {
free(buf);
buf = NULL;
}
return buf;
}
-uint64_t hash_record(struct tdb_context *tdb, tdb_off_t off)
+/* read a lump of data, allocating the space for it */
+void *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
{
- struct tdb_used_record pad, *r;
- void *key;
- uint64_t klen, hash;
-
- r = tdb_get(tdb, off, &pad, sizeof(pad));
- if (!r)
- /* FIXME */
- return 0;
-
- klen = rec_key_length(r);
- key = tdb_direct(tdb, off + sizeof(pad), klen);
- if (likely(key))
- return tdb_hash(tdb, key, klen);
-
- key = tdb_alloc_read(tdb, off + sizeof(pad), klen);
- if (unlikely(!key))
- return 0;
- hash = tdb_hash(tdb, key, klen);
- free(key);
- return hash;
+ return _tdb_alloc_read(tdb, offset, len, 0);
}
-/* Give a piece of tdb data to a parser */
-int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
- tdb_off_t offset, tdb_len_t len,
- int (*parser)(TDB_DATA key, TDB_DATA data,
- void *private_data),
- void *private_data)
+static int fill(struct tdb_context *tdb,
+ const void *buf, size_t size,
+ tdb_off_t off, tdb_len_t len)
{
- TDB_DATA data;
- int result;
- bool allocated = false;
-
- data.dsize = len;
- data.dptr = tdb_direct(tdb, offset, len);
- if (unlikely(!data.dptr)) {
- if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
+ while (len) {
+ size_t n = len > size ? size : len;
+
+ if (!tdb_pwrite_all(tdb->fd, buf, n, off)) {
+ tdb->ecode = TDB_ERR_IO;
+ tdb->log(tdb, TDB_DEBUG_FATAL, tdb->log_priv,
+ "fill write failed: giving up!\n");
return -1;
}
- allocated = true;
+ len -= n;
+ off += n;
}
- result = parser(key, data, private_data);
- if (unlikely(allocated))
- free(data.dptr);
- return result;
+ return 0;
}
/* expand a file. we prefer to use ftruncate, as that is what posix
file isn't sparse, which would be very bad if we ran out of
disk. This must be done with write, not via mmap */
memset(buf, 0x43, sizeof(buf));
- if (fill(tdb, buf, sizeof(buf), tdb->map_size, addition) == -1)
+ if (0 || fill(tdb, buf, sizeof(buf), tdb->map_size, addition) == -1)
return -1;
tdb->map_size += addition;
tdb_mmap(tdb);
return 0;
}
+/* This is only neded for tdb_access_commit, but used everywhere to simplify. */
+struct tdb_access_hdr {
+ tdb_off_t off;
+ tdb_len_t len;
+ bool convert;
+};
+
const void *tdb_access_read(struct tdb_context *tdb,
- tdb_off_t off, tdb_len_t len)
+ tdb_off_t off, tdb_len_t len, bool convert)
{
- const void *ret = tdb_direct(tdb, off, len);
+ const void *ret = NULL;
- if (!ret)
- ret = tdb_alloc_read(tdb, off, len);
- return ret;
-}
+ if (likely(!(tdb->flags & TDB_CONVERT)))
+ ret = tdb->methods->direct(tdb, off, len);
-void tdb_access_release(struct tdb_context *tdb, const void *p)
-{
- if (!tdb->map_ptr
- || (char *)p < (char *)tdb->map_ptr
- || (char *)p >= (char *)tdb->map_ptr + tdb->map_size)
- free((void *)p);
+ if (!ret) {
+ struct tdb_access_hdr *hdr;
+ hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr));
+ if (hdr) {
+ ret = hdr + 1;
+ if (convert)
+ tdb_convert(tdb, (void *)ret, len);
+ }
+ } else
+ tdb->direct_access++;
+
+ return ret;
}
-#if 0
-/* write a lump of data at a specified offset */
-static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
- const void *buf, tdb_len_t len)
+void *tdb_access_write(struct tdb_context *tdb,
+ tdb_off_t off, tdb_len_t len, bool convert)
{
- if (len == 0) {
- return 0;
- }
+ void *ret = NULL;
- if (tdb->read_only || tdb->traverse_read) {
+ if (tdb->read_only) {
tdb->ecode = TDB_ERR_RDONLY;
- return -1;
+ return NULL;
}
- if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
- return -1;
+ if (likely(!(tdb->flags & TDB_CONVERT)))
+ ret = tdb->methods->direct(tdb, off, len);
- if (tdb->map_ptr) {
- memcpy(off + (char *)tdb->map_ptr, buf, len);
- } else {
- ssize_t written = pwrite(tdb->fd, buf, len, off);
- if ((written != (ssize_t)len) && (written != -1)) {
- /* try once more */
- tdb->ecode = TDB_ERR_IO;
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_write: wrote only "
- "%d of %d bytes at %d, trying once more\n",
- (int)written, len, off));
- written = pwrite(tdb->fd, (const char *)buf+written,
- len-written,
- off+written);
- }
- if (written == -1) {
- /* Ensure ecode is set for log fn. */
- tdb->ecode = TDB_ERR_IO;
- TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d "
- "len=%d (%s)\n", off, len, strerror(errno)));
- return -1;
- } else if (written != (ssize_t)len) {
- tdb->ecode = TDB_ERR_IO;
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_write: failed to "
- "write %d bytes at %d in two attempts\n",
- len, off));
- return -1;
+ if (!ret) {
+ struct tdb_access_hdr *hdr;
+ hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr));
+ if (hdr) {
+ hdr->off = off;
+ hdr->len = len;
+ hdr->convert = convert;
+ ret = hdr + 1;
+ if (convert)
+ tdb_convert(tdb, (void *)ret, len);
}
- }
- return 0;
-}
-
+ } else
+ tdb->direct_access++;
+ return ret;
+}
-/*
- do an unlocked scan of the hash table heads to find the next non-zero head. The value
- will then be confirmed with the lock held
-*/
-static void tdb_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
+bool is_direct(const struct tdb_context *tdb, const void *p)
{
- uint32_t h = *chain;
- if (tdb->map_ptr) {
- for (;h < tdb->header.hash_size;h++) {
- if (0 != *(uint32_t *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
- break;
- }
- }
- } else {
- uint32_t off=0;
- for (;h < tdb->header.hash_size;h++) {
- if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
- break;
- }
- }
- }
- (*chain) = h;
+ return (tdb->map_ptr
+ && (char *)p >= (char *)tdb->map_ptr
+ && (char *)p < (char *)tdb->map_ptr + tdb->map_size);
}
-/* read/write a tdb_off_t */
-int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
+void tdb_access_release(struct tdb_context *tdb, const void *p)
{
- return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
+ if (is_direct(tdb, p))
+ tdb->direct_access--;
+ else
+ free((struct tdb_access_hdr *)p - 1);
}
-int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
+int tdb_access_commit(struct tdb_context *tdb, void *p)
{
- tdb_off_t off = *d;
- return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
-}
+ int ret = 0;
+ if (!tdb->map_ptr
+ || (char *)p < (char *)tdb->map_ptr
+ || (char *)p >= (char *)tdb->map_ptr + tdb->map_size) {
+ struct tdb_access_hdr *hdr;
+
+ hdr = (struct tdb_access_hdr *)p - 1;
+ if (hdr->convert)
+ ret = tdb_write_convert(tdb, hdr->off, p, hdr->len);
+ else
+ ret = tdb_write(tdb, hdr->off, p, hdr->len);
+ free(hdr);
+ } else
+ tdb->direct_access--;
+
+ return ret;
+}
-/* read/write a record */
-int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct tdb_record *rec)
+static void *tdb_direct(struct tdb_context *tdb, tdb_off_t off, size_t len)
{
- if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
- return -1;
- if (TDB_BAD_MAGIC(rec)) {
- /* Ensure ecode is set for log fn. */
- tdb->ecode = TDB_ERR_CORRUPT;
- TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
- return -1;
- }
- return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
+ if (unlikely(!tdb->map_ptr))
+ return NULL;
+
+ if (unlikely(tdb_oob(tdb, off + len, true) == -1))
+ return NULL;
+ return (char *)tdb->map_ptr + off;
}
-int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct tdb_record *rec)
+void add_stat_(struct tdb_context *tdb, uint64_t *stat, size_t val)
{
- struct tdb_record r = *rec;
- return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
+ if ((uintptr_t)stat < (uintptr_t)tdb->stats + tdb->stats->size)
+ *stat += val;
}
-#endif
static const struct tdb_methods io_methods = {
tdb_read,
tdb_write,
tdb_oob,
tdb_expand_file,
+ tdb_direct,
};
/*