tdb2: use counters to decide when to coalesce records.
authorRusty Russell <rusty@rustcorp.com.au>
Wed, 27 Apr 2011 12:14:16 +0000 (21:44 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Wed, 27 Apr 2011 12:14:16 +0000 (21:44 +0930)
This simply uses a 7 bit counter which gets incremented on each addition
to the list (but not decremented on removals).  When it wraps, we walk the
entire list looking for things to coalesce.

This causes performance problems, especially when appending records, so
we limit it in the next patch:

Before:
$ time ./growtdb-bench 250000 10 > /dev/null && ls -l /tmp/growtdb.tdb && time ./tdbtorture -s 0 && ls -l torture.tdb && ./speed --transaction 2000000
real 0m59.687s
user 0m11.593s
sys 0m4.100s
-rw------- 1 rusty rusty 752004064 2011-04-27 21:14 /tmp/growtdb.tdb
testing with 3 processes, 5000 loops, seed=0
OK

real 1m17.738s
user 0m0.348s
sys 0m0.580s
-rw------- 1 rusty rusty 663360 2011-04-27 21:15 torture.tdb
Adding 2000000 records:  926 ns (110556088 bytes)
Finding 2000000 records:  592 ns (110556088 bytes)
Missing 2000000 records:  416 ns (110556088 bytes)
Traversing 2000000 records:  422 ns (110556088 bytes)
Deleting 2000000 records:  741 ns (244003768 bytes)
Re-adding 2000000 records:  799 ns (244003768 bytes)
Appending 2000000 records:  1147 ns (295244592 bytes)
Churning 2000000 records:  1827 ns (568411440 bytes)

After:
$ time ./growtdb-bench 250000 10 > /dev/null && ls -l /tmp/growtdb.tdb && time ./tdbtorture -s 0 && ls -l torture.tdb && ./speed --transaction 2000000
real 1m17.022s
user 0m27.206s
sys 0m3.920s
-rw------- 1 rusty rusty 570130576 2011-04-27 21:17 /tmp/growtdb.tdb
testing with 3 processes, 5000 loops, seed=0
OK

real 1m27.355s
user 0m0.296s
sys 0m0.516s
-rw------- 1 rusty rusty 617352 2011-04-27 21:18 torture.tdb
Adding 2000000 records:  890 ns (110556088 bytes)
Finding 2000000 records:  565 ns (110556088 bytes)
Missing 2000000 records:  390 ns (110556088 bytes)
Traversing 2000000 records:  410 ns (110556088 bytes)
Deleting 2000000 records:  8623 ns (244003768 bytes)
Re-adding 2000000 records:  7089 ns (244003768 bytes)
Appending 2000000 records:  33708 ns (244003768 bytes)
Churning 2000000 records:  2029 ns (268404160 bytes)

ccan/tdb2/check.c
ccan/tdb2/free.c
ccan/tdb2/private.h
ccan/tdb2/tdb.c
ccan/tdb2/test/layout.c
ccan/tdb2/test/run-03-coalesce.c
ccan/tdb2/test/run-04-basichash.c
ccan/tdb2/test/run-64-bit-tdb.c
ccan/tdb2/transaction.c

index 88b86429cb97aed7a2486f53a7699626d7fc3041..52fb188764dfa0e2c85250e3658200db2fc67894 100644 (file)
@@ -533,11 +533,13 @@ static enum TDB_ERROR check_free_table(struct tdb_context *tdb,
 
                h = bucket_off(ftable_off, i);
                for (off = tdb_read_off(tdb, h); off; off = f.next) {
 
                h = bucket_off(ftable_off, i);
                for (off = tdb_read_off(tdb, h); off; off = f.next) {
-                       if (!first)
-                               first = off;
                        if (TDB_OFF_IS_ERR(off)) {
                                return off;
                        }
                        if (TDB_OFF_IS_ERR(off)) {
                                return off;
                        }
+                       if (!first) {
+                               off &= TDB_OFF_MASK;
+                               first = off;
+                       }
                        ecode = tdb_read_convert(tdb, off, &f, sizeof(f));
                        if (ecode != TDB_SUCCESS) {
                                return ecode;
                        ecode = tdb_read_convert(tdb, off, &f, sizeof(f));
                        if (ecode != TDB_SUCCESS) {
                                return ecode;
index cd9a332abe425407cf27b5e35a49db912e3a92b3..7482daa2153de5f61773353a4607eb8646a895d8 100644 (file)
@@ -109,7 +109,7 @@ static void check_list(struct tdb_context *tdb, tdb_off_t b_off)
        tdb_off_t off, prev = 0, first;
        struct tdb_free_record r;
 
        tdb_off_t off, prev = 0, first;
        struct tdb_free_record r;
 
-       first = off = tdb_read_off(tdb, b_off);
+       first = off = (tdb_read_off(tdb, b_off) & TDB_OFF_MASK);
        while (off != 0) {
                tdb_read_convert(tdb, off, &r, sizeof(r));
                if (frec_magic(&r) != TDB_FREE_MAGIC)
        while (off != 0) {
                tdb_read_convert(tdb, off, &r, sizeof(r));
                if (frec_magic(&r) != TDB_FREE_MAGIC)
@@ -150,17 +150,21 @@ static enum TDB_ERROR remove_from_list(struct tdb_context *tdb,
 
        /* If prev->next == 0, we were head: update bucket to point to next. */
        if (prev_next == 0) {
 
        /* If prev->next == 0, we were head: update bucket to point to next. */
        if (prev_next == 0) {
-#ifdef CCAN_TDB2_DEBUG
-               if (tdb_read_off(tdb, b_off) != r_off) {
+               /* We must preserve upper bits. */
+               head = tdb_read_off(tdb, b_off);
+               if (TDB_OFF_IS_ERR(head))
+                       return head;
+
+               if ((head & TDB_OFF_MASK) != r_off) {
                        return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
                                          "remove_from_list:"
                                          " %llu head %llu on list %llu",
                                          (long long)r_off,
                        return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
                                          "remove_from_list:"
                                          " %llu head %llu on list %llu",
                                          (long long)r_off,
-                                         (long long)tdb_read_off(tdb, b_off),
+                                         (long long)head,
                                          (long long)b_off);
                }
                                          (long long)b_off);
                }
-#endif
-               ecode = tdb_write_off(tdb, b_off, r->next);
+               head = ((head & ~TDB_OFF_MASK) | r->next);
+               ecode = tdb_write_off(tdb, b_off, head);
                if (ecode != TDB_SUCCESS)
                        return ecode;
        } else {
                if (ecode != TDB_SUCCESS)
                        return ecode;
        } else {
@@ -175,6 +179,7 @@ static enum TDB_ERROR remove_from_list(struct tdb_context *tdb,
                head = tdb_read_off(tdb, b_off);
                if (TDB_OFF_IS_ERR(head))
                        return head;
                head = tdb_read_off(tdb, b_off);
                if (TDB_OFF_IS_ERR(head))
                        return head;
+               head &= TDB_OFF_MASK;
                off = head + offsetof(struct tdb_free_record, magic_and_prev);
        } else {
                /* off = &r->next->prev */
                off = head + offsetof(struct tdb_free_record, magic_and_prev);
        } else {
                /* off = &r->next->prev */
@@ -195,26 +200,29 @@ static enum TDB_ERROR remove_from_list(struct tdb_context *tdb,
        return tdb_write_off(tdb, off, r->magic_and_prev);
 }
 
        return tdb_write_off(tdb, off, r->magic_and_prev);
 }
 
-/* Enqueue in this free bucket. */
+/* Enqueue in this free bucket: sets coalesce if we've added 128
+ * entries to it. */
 static enum TDB_ERROR enqueue_in_free(struct tdb_context *tdb,
                                      tdb_off_t b_off,
                                      tdb_off_t off,
 static enum TDB_ERROR enqueue_in_free(struct tdb_context *tdb,
                                      tdb_off_t b_off,
                                      tdb_off_t off,
-                                     tdb_len_t len)
+                                     tdb_len_t len,
+                                     bool *coalesce)
 {
        struct tdb_free_record new;
        enum TDB_ERROR ecode;
 {
        struct tdb_free_record new;
        enum TDB_ERROR ecode;
-       tdb_off_t prev;
+       tdb_off_t prev, head;
        uint64_t magic = (TDB_FREE_MAGIC << (64 - TDB_OFF_UPPER_STEAL));
 
        uint64_t magic = (TDB_FREE_MAGIC << (64 - TDB_OFF_UPPER_STEAL));
 
+       head = tdb_read_off(tdb, b_off);
+       if (TDB_OFF_IS_ERR(head))
+               return head;
+
        /* We only need to set ftable_and_len; rest is set in enqueue_in_free */
        new.ftable_and_len = ((uint64_t)tdb->ftable << (64 - TDB_OFF_UPPER_STEAL))
                | len;
 
        /* new->next = head. */
        /* We only need to set ftable_and_len; rest is set in enqueue_in_free */
        new.ftable_and_len = ((uint64_t)tdb->ftable << (64 - TDB_OFF_UPPER_STEAL))
                | len;
 
        /* new->next = head. */
-       new.next = tdb_read_off(tdb, b_off);
-       if (TDB_OFF_IS_ERR(new.next)) {
-               return new.next;
-       }
+       new.next = (head & TDB_OFF_MASK);
 
        /* First element?  Prev points to ourselves. */
        if (!new.next) {
 
        /* First element?  Prev points to ourselves. */
        if (!new.next) {
@@ -255,65 +263,23 @@ static enum TDB_ERROR enqueue_in_free(struct tdb_context *tdb,
                }
 #endif
        }
                }
 #endif
        }
-       /* head = new */
-       ecode = tdb_write_off(tdb, b_off, off);
-       if (ecode != TDB_SUCCESS) {
-               return ecode;
-       }
 
 
-       return tdb_write_convert(tdb, off, &new, sizeof(new));
-}
+       /* Update enqueue count, but don't set high bit: see TDB_OFF_IS_ERR */
+       if (*coalesce)
+               head += (1ULL << (64 - TDB_OFF_UPPER_STEAL));
+       head &= ~(TDB_OFF_MASK | (1ULL << 63));
+       head |= off;
 
 
-/* List need not be locked. */
-enum TDB_ERROR add_free_record(struct tdb_context *tdb,
-                              tdb_off_t off, tdb_len_t len_with_header,
-                              enum tdb_lock_flags waitflag)
-{
-       tdb_off_t b_off;
-       tdb_len_t len;
-       enum TDB_ERROR ecode;
-
-       assert(len_with_header >= sizeof(struct tdb_free_record));
-
-       len = len_with_header - sizeof(struct tdb_used_record);
-
-       b_off = bucket_off(tdb->ftable_off, size_to_bucket(len));
-       ecode = tdb_lock_free_bucket(tdb, b_off, waitflag);
+       ecode = tdb_write_off(tdb, b_off, head);
        if (ecode != TDB_SUCCESS) {
                return ecode;
        }
 
        if (ecode != TDB_SUCCESS) {
                return ecode;
        }
 
-       ecode = enqueue_in_free(tdb, b_off, off, len);
-       check_list(tdb, b_off);
-       tdb_unlock_free_bucket(tdb, b_off);
-       return ecode;
-}
-
-static size_t adjust_size(size_t keylen, size_t datalen)
-{
-       size_t size = keylen + datalen;
-
-       if (size < TDB_MIN_DATA_LEN)
-               size = TDB_MIN_DATA_LEN;
-
-       /* Round to next uint64_t boundary. */
-       return (size + (sizeof(uint64_t) - 1ULL)) & ~(sizeof(uint64_t) - 1ULL);
-}
-
-/* If we have enough left over to be useful, split that off. */
-static size_t record_leftover(size_t keylen, size_t datalen,
-                             bool want_extra, size_t total_len)
-{
-       ssize_t leftover;
+       /* It's time to coalesce if counter wrapped. */
+       if (*coalesce)
+               *coalesce = ((head & ~TDB_OFF_MASK) == 0);
 
 
-       if (want_extra)
-               datalen += datalen / 2;
-       leftover = total_len - adjust_size(keylen, datalen);
-
-       if (leftover < (ssize_t)sizeof(struct tdb_free_record))
-               return 0;
-
-       return leftover;
+       return tdb_write_convert(tdb, off, &new, sizeof(new));
 }
 
 static tdb_off_t ftable_offset(struct tdb_context *tdb, unsigned int ftable)
 }
 
 static tdb_off_t ftable_offset(struct tdb_context *tdb, unsigned int ftable)
@@ -334,13 +300,12 @@ static tdb_off_t ftable_offset(struct tdb_context *tdb, unsigned int ftable)
        return off;
 }
 
        return off;
 }
 
-/* Note: we unlock the current bucket if fail (-ve), or coalesce (-ve) and
- * need to blatt either of the *protect records (which is set to an error). */
+/* Note: we unlock the current bucket if fail (-ve), or coalesce (+ve) and
+ * need to blatt the *protect record (which is set to an error). */
 static tdb_len_t coalesce(struct tdb_context *tdb,
                          tdb_off_t off, tdb_off_t b_off,
                          tdb_len_t data_len,
 static tdb_len_t coalesce(struct tdb_context *tdb,
                          tdb_off_t off, tdb_off_t b_off,
                          tdb_len_t data_len,
-                         tdb_off_t *protect1,
-                         tdb_off_t *protect2)
+                         tdb_off_t *protect)
 {
        tdb_off_t end;
        struct tdb_free_record rec;
 {
        tdb_off_t end;
        struct tdb_free_record rec;
@@ -405,8 +370,8 @@ static tdb_len_t coalesce(struct tdb_context *tdb,
                }
 
                /* Did we just mess up a record you were hoping to use? */
                }
 
                /* Did we just mess up a record you were hoping to use? */
-               if (end == *protect1 || end == *protect2)
-                       *protect1 = TDB_ERR_NOEXIST;
+               if (end == *protect)
+                       *protect = TDB_ERR_NOEXIST;
 
                ecode = remove_from_list(tdb, nb_off, end, &rec);
                check_list(tdb, nb_off);
 
                ecode = remove_from_list(tdb, nb_off, end, &rec);
                check_list(tdb, nb_off);
@@ -425,8 +390,8 @@ static tdb_len_t coalesce(struct tdb_context *tdb,
                return 0;
 
        /* Before we expand, check this isn't one you wanted protected? */
                return 0;
 
        /* Before we expand, check this isn't one you wanted protected? */
-       if (off == *protect1 || off == *protect2)
-               *protect1 = TDB_ERR_EXISTS;
+       if (off == *protect)
+               *protect = TDB_ERR_EXISTS;
 
        /* OK, expand initial record */
        ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
 
        /* OK, expand initial record */
        ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
@@ -447,11 +412,11 @@ static tdb_len_t coalesce(struct tdb_context *tdb,
                goto err;
        }
 
                goto err;
        }
 
-       /* Try locking violation first... */
-       ecode = add_free_record(tdb, off, end - off, TDB_LOCK_NOWAIT);
+       /* Try locking violation first.  We don't allow coalesce recursion! */
+       ecode = add_free_record(tdb, off, end - off, TDB_LOCK_NOWAIT, false);
        if (ecode != TDB_SUCCESS) {
                /* Need to drop lock.  Can't rely on anything stable. */
        if (ecode != TDB_SUCCESS) {
                /* Need to drop lock.  Can't rely on anything stable. */
-               *protect1 = TDB_ERR_CORRUPT;
+               *protect = TDB_ERR_CORRUPT;
 
                /* We have to drop this to avoid deadlocks, so make sure record
                 * doesn't get coalesced by someone else! */
 
                /* We have to drop this to avoid deadlocks, so make sure record
                 * doesn't get coalesced by someone else! */
@@ -469,11 +434,12 @@ static tdb_len_t coalesce(struct tdb_context *tdb,
                tdb->stats.alloc_coalesce_succeeded++;
                tdb_unlock_free_bucket(tdb, b_off);
 
                tdb->stats.alloc_coalesce_succeeded++;
                tdb_unlock_free_bucket(tdb, b_off);
 
-               ecode = add_free_record(tdb, off, end - off, TDB_LOCK_WAIT);
+               ecode = add_free_record(tdb, off, end - off, TDB_LOCK_WAIT,
+                                       false);
                if (ecode != TDB_SUCCESS) {
                        return ecode;
                }
                if (ecode != TDB_SUCCESS) {
                        return ecode;
                }
-       } else if (TDB_OFF_IS_ERR(*protect1)) {
+       } else if (TDB_OFF_IS_ERR(*protect)) {
                /* For simplicity, we always drop lock if they can't continue */
                tdb_unlock_free_bucket(tdb, b_off);
        }
                /* For simplicity, we always drop lock if they can't continue */
                tdb_unlock_free_bucket(tdb, b_off);
        }
@@ -487,6 +453,109 @@ err:
        return ecode;
 }
 
        return ecode;
 }
 
+/* List is locked: we unlock it. */
+static enum TDB_ERROR coalesce_list(struct tdb_context *tdb,
+                                   tdb_off_t ftable_off, tdb_off_t b_off)
+{
+       enum TDB_ERROR ecode;
+       tdb_off_t off;
+
+       off = tdb_read_off(tdb, b_off);
+       if (TDB_OFF_IS_ERR(off)) {
+               ecode = off;
+               goto unlock_err;
+       }
+       /* A little bit of paranoia */
+       off &= TDB_OFF_MASK;
+
+       while (off) {
+               struct tdb_free_record rec;
+               tdb_len_t coal;
+               tdb_off_t next;
+
+               ecode = tdb_read_convert(tdb, off, &rec, sizeof(rec));
+               if (ecode != TDB_SUCCESS)
+                       goto unlock_err;
+
+               next = rec.next;
+               coal = coalesce(tdb, off, b_off, frec_len(&rec), &next);
+               if (TDB_OFF_IS_ERR(coal)) {
+                       /* This has already unlocked on error. */
+                       return coal;
+               }
+               if (TDB_OFF_IS_ERR(next)) {
+                       /* Coalescing had to unlock, so stop. */
+                       return TDB_SUCCESS;
+               }
+               off = next;
+       }
+
+       tdb_unlock_free_bucket(tdb, b_off);
+       return TDB_SUCCESS;
+
+unlock_err:
+       tdb_unlock_free_bucket(tdb, b_off);
+       return ecode;
+}
+
+/* List must not be locked if coalesce_ok is set. */
+enum TDB_ERROR add_free_record(struct tdb_context *tdb,
+                              tdb_off_t off, tdb_len_t len_with_header,
+                              enum tdb_lock_flags waitflag,
+                              bool coalesce)
+{
+       tdb_off_t b_off;
+       tdb_len_t len;
+       enum TDB_ERROR ecode;
+
+       assert(len_with_header >= sizeof(struct tdb_free_record));
+
+       len = len_with_header - sizeof(struct tdb_used_record);
+
+       b_off = bucket_off(tdb->ftable_off, size_to_bucket(len));
+       ecode = tdb_lock_free_bucket(tdb, b_off, waitflag);
+       if (ecode != TDB_SUCCESS) {
+               return ecode;
+       }
+
+       ecode = enqueue_in_free(tdb, b_off, off, len, &coalesce);
+       check_list(tdb, b_off);
+
+       /* Coalescing unlocks free list. */
+       if (!ecode && coalesce)
+               ecode = coalesce_list(tdb, tdb->ftable_off, b_off);
+       else
+               tdb_unlock_free_bucket(tdb, b_off);
+       return ecode;
+}
+
+static size_t adjust_size(size_t keylen, size_t datalen)
+{
+       size_t size = keylen + datalen;
+
+       if (size < TDB_MIN_DATA_LEN)
+               size = TDB_MIN_DATA_LEN;
+
+       /* Round to next uint64_t boundary. */
+       return (size + (sizeof(uint64_t) - 1ULL)) & ~(sizeof(uint64_t) - 1ULL);
+}
+
+/* If we have enough left over to be useful, split that off. */
+static size_t record_leftover(size_t keylen, size_t datalen,
+                             bool want_extra, size_t total_len)
+{
+       ssize_t leftover;
+
+       if (want_extra)
+               datalen += datalen / 2;
+       leftover = total_len - adjust_size(keylen, datalen);
+
+       if (leftover < (ssize_t)sizeof(struct tdb_free_record))
+               return 0;
+
+       return leftover;
+}
+
 /* We need size bytes to put our key and data in. */
 static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
                                tdb_off_t ftable_off,
 /* We need size bytes to put our key and data in. */
 static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
                                tdb_off_t ftable_off,
@@ -499,12 +568,10 @@ static tdb_off_t lock_and_alloc(struct tdb_context *tdb,
        tdb_off_t off, b_off,best_off;
        struct tdb_free_record best = { 0 };
        double multiplier;
        tdb_off_t off, b_off,best_off;
        struct tdb_free_record best = { 0 };
        double multiplier;
-       bool coalesce_after_best = false; /* Damn GCC warning! */
        size_t size = adjust_size(keylen, datalen);
        enum TDB_ERROR ecode;
 
        tdb->stats.allocs++;
        size_t size = adjust_size(keylen, datalen);
        enum TDB_ERROR ecode;
 
        tdb->stats.allocs++;
-again:
        b_off = bucket_off(ftable_off, bucket);
 
        /* FIXME: Try non-blocking wait first, to measure contention. */
        b_off = bucket_off(ftable_off, bucket);
 
        /* FIXME: Try non-blocking wait first, to measure contention. */
@@ -530,10 +597,11 @@ again:
                ecode = off;
                goto unlock_err;
        }
                ecode = off;
                goto unlock_err;
        }
+       off &= TDB_OFF_MASK;
 
        while (off) {
                const struct tdb_free_record *r;
 
        while (off) {
                const struct tdb_free_record *r;
-               tdb_len_t len, coal;
+               tdb_len_t len;
                tdb_off_t next;
 
                r = tdb_access_read(tdb, off, sizeof(*r), true);
                tdb_off_t next;
 
                r = tdb_access_read(tdb, off, sizeof(*r), true);
@@ -555,7 +623,6 @@ again:
                if (frec_len(r) >= size && frec_len(r) < frec_len(&best)) {
                        best_off = off;
                        best = *r;
                if (frec_len(r) >= size && frec_len(r) < frec_len(&best)) {
                        best_off = off;
                        best = *r;
-                       coalesce_after_best = false;
                }
 
                if (frec_len(&best) <= size * multiplier && best_off) {
                }
 
                if (frec_len(&best) <= size * multiplier && best_off) {
@@ -568,19 +635,6 @@ again:
                next = r->next;
                len = frec_len(r);
                tdb_access_release(tdb, r);
                next = r->next;
                len = frec_len(r);
                tdb_access_release(tdb, r);
-
-               /* Since we're going slow anyway, try coalescing here. */
-               coal = coalesce(tdb, off, b_off, len, &best_off, &next);
-               if (TDB_OFF_IS_ERR(coal)) {
-                       /* This has already unlocked on error. */
-                       return coal;
-               }
-               if (TDB_OFF_IS_ERR(best_off)) {
-                       /* This has unlocked list, restart. */
-                       goto again;
-               }
-               if (coal > 0)
-                       coalesce_after_best = true;
                off = next;
        }
 
                off = next;
        }
 
@@ -589,14 +643,6 @@ again:
                struct tdb_used_record rec;
                size_t leftover;
 
                struct tdb_used_record rec;
                size_t leftover;
 
-               /* If we coalesced, we might have change prev/next ptrs. */
-               if (coalesce_after_best) {
-                       ecode = tdb_read_convert(tdb, best_off, &best,
-                                                sizeof(best));
-                       if (ecode != TDB_SUCCESS)
-                               goto unlock_err;
-               }
-
                /* We're happy with this size: take it. */
                ecode = remove_from_list(tdb, b_off, best_off, &best);
                check_list(tdb, b_off);
                /* We're happy with this size: take it. */
                ecode = remove_from_list(tdb, b_off, best_off, &best);
                check_list(tdb, b_off);
@@ -637,7 +683,7 @@ again:
                        ecode = add_free_record(tdb,
                                                best_off + sizeof(rec)
                                                + frec_len(&best) - leftover,
                        ecode = add_free_record(tdb,
                                                best_off + sizeof(rec)
                                                + frec_len(&best) - leftover,
-                                               leftover, TDB_LOCK_WAIT);
+                                               leftover, TDB_LOCK_WAIT, false);
                        if (ecode != TDB_SUCCESS) {
                                best_off = ecode;
                        }
                        if (ecode != TDB_SUCCESS) {
                                best_off = ecode;
                        }
@@ -811,7 +857,7 @@ static enum TDB_ERROR tdb_expand(struct tdb_context *tdb, tdb_len_t size)
        tdb_unlock_expand(tdb, F_WRLCK);
 
        tdb->stats.expands++;
        tdb_unlock_expand(tdb, F_WRLCK);
 
        tdb->stats.expands++;
-       return add_free_record(tdb, old_size, wanted, TDB_LOCK_WAIT);
+       return add_free_record(tdb, old_size, wanted, TDB_LOCK_WAIT, true);
 }
 
 /* This won't fail: it will expand the database if it has to. */
 }
 
 /* This won't fail: it will expand the database if it has to. */
index 213e83615a33cc00090175badb2b05e1c3aca24e..d93fcfe8117f87453504efaf18854ae1a55c6920 100644 (file)
@@ -466,7 +466,8 @@ tdb_off_t alloc(struct tdb_context *tdb, size_t keylen, size_t datalen,
 /* Put this record in a free list. */
 enum TDB_ERROR add_free_record(struct tdb_context *tdb,
                               tdb_off_t off, tdb_len_t len_with_header,
 /* Put this record in a free list. */
 enum TDB_ERROR add_free_record(struct tdb_context *tdb,
                               tdb_off_t off, tdb_len_t len_with_header,
-                              enum tdb_lock_flags waitflag);
+                              enum tdb_lock_flags waitflag,
+                              bool coalesce_ok);
 
 /* Set up header for a used/ftable/htable/chain record. */
 enum TDB_ERROR set_header(struct tdb_context *tdb,
 
 /* Set up header for a used/ftable/htable/chain record. */
 enum TDB_ERROR set_header(struct tdb_context *tdb,
index d7b5163be95eba66ac204f1be75dc239350c41f9..f11701d9ea9c686cfc2b0beefbea069eb3602542 100644 (file)
@@ -42,7 +42,7 @@ static enum TDB_ERROR replace_data(struct tdb_context *tdb,
                ecode = add_free_record(tdb, old_off,
                                        sizeof(struct tdb_used_record)
                                        + key.dsize + old_room,
                ecode = add_free_record(tdb, old_off,
                                        sizeof(struct tdb_used_record)
                                        + key.dsize + old_room,
-                                       TDB_LOCK_WAIT);
+                                       TDB_LOCK_WAIT, true);
                if (ecode == TDB_SUCCESS)
                        ecode = replace_in_hash(tdb, h, new_off);
        } else {
                if (ecode == TDB_SUCCESS)
                        ecode = replace_in_hash(tdb, h, new_off);
        } else {
@@ -292,7 +292,7 @@ enum TDB_ERROR tdb_delete(struct tdb_context *tdb, struct tdb_data key)
                                + rec_key_length(&rec)
                                + rec_data_length(&rec)
                                + rec_extra_padding(&rec),
                                + rec_key_length(&rec)
                                + rec_data_length(&rec)
                                + rec_extra_padding(&rec),
-                               TDB_LOCK_WAIT);
+                               TDB_LOCK_WAIT, true);
 
        if (tdb->flags & TDB_SEQNUM)
                tdb_inc_seqnum(tdb);
 
        if (tdb->flags & TDB_SEQNUM)
                tdb_inc_seqnum(tdb);
index be54fe977891b8ea6835c8d40516abb5289214bb..6fcee6d482a581f35d4974f29de161b502b4682a 100644 (file)
@@ -150,7 +150,7 @@ static void add_to_freetable(struct tdb_context *tdb,
        tdb->ftable_off = freetable->base.off;
        tdb->ftable = ftable;
        add_free_record(tdb, eoff, sizeof(struct tdb_used_record) + elen,
        tdb->ftable_off = freetable->base.off;
        tdb->ftable = ftable;
        add_free_record(tdb, eoff, sizeof(struct tdb_used_record) + elen,
-                       TDB_LOCK_WAIT);
+                       TDB_LOCK_WAIT, false);
 }
 
 static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned ingroup)
 }
 
 static tdb_off_t hbucket_off(tdb_off_t group_start, unsigned ingroup)
index e60e341eca46d9fe359a98602f144184f41271bc..3fdd11c07706127d2f8d60857efaddfcdbd38dac 100644 (file)
@@ -52,7 +52,7 @@ int main(int argc, char *argv[])
        /* Lock and fail to coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
        test = layout->elem[1].base.off;
        /* Lock and fail to coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
        test = layout->elem[1].base.off;
-       ok1(coalesce(tdb, layout->elem[1].base.off, b_off, len, &test, &test)
+       ok1(coalesce(tdb, layout->elem[1].base.off, b_off, len, &test)
            == 0);
        tdb_unlock_free_bucket(tdb, b_off);
        ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
            == 0);
        tdb_unlock_free_bucket(tdb, b_off);
        ok1(free_record_length(tdb, layout->elem[1].base.off) == len);
@@ -75,7 +75,7 @@ int main(int argc, char *argv[])
        /* Lock and fail to coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
        test = layout->elem[1].base.off;
        /* Lock and fail to coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
        test = layout->elem[1].base.off;
-       ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
+       ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test)
            == 0);
        tdb_unlock_free_bucket(tdb, b_off);
        ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
            == 0);
        tdb_unlock_free_bucket(tdb, b_off);
        ok1(free_record_length(tdb, layout->elem[1].base.off) == 1024);
@@ -99,7 +99,7 @@ int main(int argc, char *argv[])
        /* Lock and coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
        test = layout->elem[2].base.off;
        /* Lock and coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
        test = layout->elem[2].base.off;
-       ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
+       ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test)
            == 1024 + sizeof(struct tdb_used_record) + 2048);
        /* Should tell us it's erased this one... */
        ok1(test == TDB_ERR_NOEXIST);
            == 1024 + sizeof(struct tdb_used_record) + 2048);
        /* Should tell us it's erased this one... */
        ok1(test == TDB_ERR_NOEXIST);
@@ -126,7 +126,7 @@ int main(int argc, char *argv[])
        /* Lock and coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
        test = layout->elem[2].base.off;
        /* Lock and coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
        test = layout->elem[2].base.off;
-       ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
+       ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test)
            == 1024 + sizeof(struct tdb_used_record) + 512);
        ok1(tdb->file->allrecord_lock.count == 0 && tdb->file->num_lockrecs == 0);
        ok1(free_record_length(tdb, layout->elem[1].base.off)
            == 1024 + sizeof(struct tdb_used_record) + 512);
        ok1(tdb->file->allrecord_lock.count == 0 && tdb->file->num_lockrecs == 0);
        ok1(free_record_length(tdb, layout->elem[1].base.off)
@@ -153,7 +153,7 @@ int main(int argc, char *argv[])
        /* Lock and coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
        test = layout->elem[2].base.off;
        /* Lock and coalesce. */
        ok1(tdb_lock_free_bucket(tdb, b_off, TDB_LOCK_WAIT) == 0);
        test = layout->elem[2].base.off;
-       ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test, &test)
+       ok1(coalesce(tdb, layout->elem[1].base.off, b_off, 1024, &test)
            == 1024 + sizeof(struct tdb_used_record) + 512
            + sizeof(struct tdb_used_record) + 256);
        ok1(tdb->file->allrecord_lock.count == 0
            == 1024 + sizeof(struct tdb_used_record) + 512
            + sizeof(struct tdb_used_record) + 256);
        ok1(tdb->file->allrecord_lock.count == 0
index b92b6bdde05a4d1928094576b03206e9b194864d..815011faeb2c68e870efd1e0600da7ffdaa0c373 100644 (file)
@@ -177,7 +177,7 @@ int main(int argc, char *argv[])
                                    + rec_key_length(&rec)
                                    + rec_data_length(&rec)
                                    + rec_extra_padding(&rec),
                                    + rec_key_length(&rec)
                                    + rec_data_length(&rec)
                                    + rec_extra_padding(&rec),
-                                   TDB_LOCK_NOWAIT) == 0);
+                                   TDB_LOCK_NOWAIT, false) == 0);
                ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
                                      F_WRLCK) == 0);
                ok1(tdb_check(tdb, NULL, NULL) == 0);
                ok1(tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range,
                                      F_WRLCK) == 0);
                ok1(tdb_check(tdb, NULL, NULL) == 0);
index 206fc158bea887e0bd7312a800d4740346fd000f..20c85efa10f9a269dd810b62fe2973a0ae9b3cf9 100644 (file)
@@ -43,11 +43,11 @@ int main(int argc, char *argv[])
                /* This makes a sparse file */
                ok1(ftruncate(tdb->file->fd, 0xFFFFFFF0) == 0);
                ok1(add_free_record(tdb, old_size, 0xFFFFFFF0 - old_size,
                /* This makes a sparse file */
                ok1(ftruncate(tdb->file->fd, 0xFFFFFFF0) == 0);
                ok1(add_free_record(tdb, old_size, 0xFFFFFFF0 - old_size,
-                                   TDB_LOCK_WAIT) == TDB_SUCCESS);
+                                   TDB_LOCK_WAIT, false) == TDB_SUCCESS);
 
                /* Now add a little record past the 4G barrier. */
                ok1(tdb_expand_file(tdb, 100) == TDB_SUCCESS);
 
                /* Now add a little record past the 4G barrier. */
                ok1(tdb_expand_file(tdb, 100) == TDB_SUCCESS);
-               ok1(add_free_record(tdb, 0xFFFFFFF0, 100, TDB_LOCK_WAIT)
+               ok1(add_free_record(tdb, 0xFFFFFFF0, 100, TDB_LOCK_WAIT, false)
                    == TDB_SUCCESS);
 
                ok1(tdb_check(tdb, NULL, NULL) == TDB_SUCCESS);
                    == TDB_SUCCESS);
 
                ok1(tdb_check(tdb, NULL, NULL) == TDB_SUCCESS);
index a27c027365eec3fc7be001d9693bde42ec6f0b45..f1414391abe42caa7b81504dede1ea3700291c1b 100644 (file)
@@ -689,7 +689,7 @@ static enum TDB_ERROR tdb_recovery_allocate(struct tdb_context *tdb,
                tdb->stats.frees++;
                ecode = add_free_record(tdb, recovery_head,
                                        sizeof(rec) + rec.max_len,
                tdb->stats.frees++;
                ecode = add_free_record(tdb, recovery_head,
                                        sizeof(rec) + rec.max_len,
-                                       TDB_LOCK_WAIT);
+                                       TDB_LOCK_WAIT, true);
                if (ecode != TDB_SUCCESS) {
                        return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
                                          "tdb_recovery_allocate:"
                if (ecode != TDB_SUCCESS) {
                        return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
                                          "tdb_recovery_allocate:"