tdb: use tdb_lockall_gradual by default.
authorRusty Russell <rusty@rustcorp.com.au>
Fri, 13 Aug 2010 05:37:16 +0000 (15:07 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Fri, 13 Aug 2010 05:37:16 +0000 (15:07 +0930)
This has the advantage the transactions will also use it, preventing them
from blocking.

ccan/tdb/lock.c
ccan/tdb/tdb.h
ccan/tdb/tools/starvation.c

index 977091597f7d5d1247e815292e15605be6bab4bc..dfcac7a540adc30482dcd86e0c299b36bfb52e0b 100644 (file)
@@ -152,14 +152,6 @@ int tdb_brlock(struct tdb_context *tdb,
                return -1;
        }
 
-       /* Sanity check */
-       if (tdb->transaction && offset >= lock_offset(-1) && len != 0) {
-               tdb->ecode = TDB_ERR_RDONLY;
-               TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_brlock attempted in transaction at offset %d rw_type=%d flags=%d len=%d\n",
-                        offset, rw_type, flags, (int)len));
-               return -1;
-       }
-
        do {
                ret = fcntl_lock(tdb, rw_type, offset, len,
                                 flags & TDB_LOCK_WAIT);
@@ -520,8 +512,43 @@ static int tdb_allrecord_check(struct tdb_context *tdb, int ltype,
        return 1;
 }
 
+/* We only need to lock individual bytes, but Linux merges consecutive locks
+ * so we lock in contiguous ranges. */
+static int tdb_chainlock_gradual(struct tdb_context *tdb,
+                                int ltype, enum tdb_lock_flags flags,
+                                size_t off, size_t len)
+{
+       int ret;
+       enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
+
+       if (len <= 4) {
+               /* Single record.  Just do blocking lock. */
+               return tdb_brlock(tdb, ltype, off, len, flags);
+       }
+
+       /* First we try non-blocking. */
+       ret = tdb_brlock(tdb, ltype, off, len, nb_flags);
+       if (ret == 0) {
+               return 0;
+       }
+
+       /* Try locking first half, then second. */
+       ret = tdb_chainlock_gradual(tdb, ltype, flags, off, len / 2);
+       if (ret == -1)
+               return -1;
+
+       ret = tdb_chainlock_gradual(tdb, ltype, flags,
+                                   off + len / 2, len - len / 2);
+       if (ret == -1) {
+               tdb_brunlock(tdb, ltype, off, len / 2);
+               return -1;
+       }
+       return 0;
+}
+
 /* lock/unlock entire database.  It can only be upgradable if you have some
- * other way of guaranteeing exclusivity (ie. transaction write lock). */
+ * other way of guaranteeing exclusivity (ie. transaction write lock).
+ * We do the locking gradually to avoid being starved by smaller locks. */
 int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
                       enum tdb_lock_flags flags, bool upgradable)
 {
@@ -532,10 +559,23 @@ int tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
                return 0;
        }
 
-       if (tdb_brlock(tdb, ltype, FREELIST_TOP, 0, flags)) {
-               if (flags & TDB_LOCK_WAIT) {
-                       TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
-               }
+       /* We cover two kinds of locks:
+        * 1) Normal chain locks.  Taken for almost all operations.
+        * 3) Individual records locks.  Taken after normal or free
+        *    chain locks.
+        *
+        * It is (1) which cause the starvation problem, so we're only
+        * gradual for that. */
+       if (tdb_chainlock_gradual(tdb, ltype, flags, FREELIST_TOP,
+                                 tdb->header.hash_size * 4) == -1) {
+               return -1;
+       }
+
+       /* Grab individual record locks. */
+       if (tdb_brlock(tdb, ltype, lock_offset(tdb->header.hash_size), 0,
+                      flags) == -1) {
+               tdb_brunlock(tdb, ltype, FREELIST_TOP,
+                            tdb->header.hash_size * 4);
                return -1;
        }
 
@@ -660,85 +700,6 @@ int tdb_unlockall_read(struct tdb_context *tdb)
        return tdb_allrecord_unlock(tdb, F_RDLCK, false);
 }
 
-/* We only need to lock individual bytes, but Linux merges consecutive locks
- * so we lock in contiguous ranges. */
-static int tdb_chainlock_gradual(struct tdb_context *tdb,
-                                size_t off, size_t len)
-{
-       int ret;
-
-       if (len <= 4) {
-               /* Single record.  Just do blocking lock. */
-               return tdb_brlock(tdb, F_WRLCK, off, len, TDB_LOCK_WAIT);
-       }
-
-       /* First we try non-blocking. */
-       ret = tdb_brlock(tdb, F_WRLCK, off, len, TDB_LOCK_NOWAIT);
-       if (ret == 0) {
-               return 0;
-       }
-
-       /* Try locking first half, then second. */
-       ret = tdb_chainlock_gradual(tdb, off, len / 2);
-       if (ret == -1)
-               return -1;
-
-       ret = tdb_chainlock_gradual(tdb, off + len / 2, len - len / 2);
-       if (ret == -1) {
-               tdb_brunlock(tdb, F_WRLCK, off, len / 2);
-               return -1;
-       }
-       return 0;
-}
-
-/* We do the locking gradually to avoid being starved by smaller locks. */
-int tdb_lockall_gradual(struct tdb_context *tdb)
-{
-       int ret;
-
-       /* This checks for other locks, nesting. */
-       ret = tdb_allrecord_check(tdb, F_WRLCK, TDB_LOCK_WAIT, false);
-       if (ret == -1 || ret == 0)
-               return ret;
-
-       /* We cover two kinds of locks:
-        * 1) Normal chain locks.  Taken for almost all operations.
-        * 3) Individual records locks.  Taken after normal or free
-        *    chain locks.
-        *
-        * It is (1) which cause the starvation problem, so we're only
-        * gradual for that. */
-       if (tdb_chainlock_gradual(tdb, FREELIST_TOP,
-                                 tdb->header.hash_size * 4) == -1) {
-               return -1;
-       }
-
-       /* Grab individual record locks. */
-       if (tdb_brlock(tdb, F_WRLCK, lock_offset(tdb->header.hash_size), 0,
-                      TDB_LOCK_WAIT) == -1) {
-               tdb_brunlock(tdb, F_WRLCK, FREELIST_TOP,
-                            tdb->header.hash_size * 4);
-               return -1;
-       }
-
-       /* That adds up to an allrecord lock. */
-       tdb->allrecord_lock.count = 1;
-       tdb->allrecord_lock.ltype = F_WRLCK;
-       tdb->allrecord_lock.off = false;
-
-       /* Just check we don't need recovery... */
-       if (tdb_needs_recovery(tdb)) {
-               tdb_allrecord_unlock(tdb, F_WRLCK, false);
-               if (tdb_lock_and_recover(tdb) == -1) {
-                       return -1;
-               }
-               /* Try again. */
-               return tdb_lockall_gradual(tdb);
-       }
-
-       return 0;
-}
-
 /* lock/unlock one hash chain. This is meant to be used to reduce
    contention - it cannot guarantee how many records will be locked */
 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
index 7fa9517038469e38569c738421746d717ab674fb..b84e20560021d45034bb4f29df78da30fb4cba30 100644 (file)
@@ -132,7 +132,6 @@ int tdb_lockall_read_nonblock(struct tdb_context *tdb);
 int tdb_unlockall_read(struct tdb_context *tdb);
 int tdb_lockall_mark(struct tdb_context *tdb);
 int tdb_lockall_unmark(struct tdb_context *tdb);
-int tdb_lockall_gradual(struct tdb_context *tdb);
 const char *tdb_name(struct tdb_context *tdb);
 int tdb_fd(struct tdb_context *tdb);
 tdb_log_func tdb_log_fn(struct tdb_context *tdb);
index 5f8c23d1b9a50d4b423c86b3a74d03933b76b855..1baeab6e91dbcd0421c4a4fb022261bfb49c0ac5 100644 (file)
@@ -77,9 +77,13 @@ int main(int argc, char *argv[])
 
        if (strcmp(argv[1], "lockall") == 0)
                lockall = tdb_lockall;
-       else if (strcmp(argv[1], "gradual") == 0)
+       else if (strcmp(argv[1], "gradual") == 0) {
+#if 0
                lockall = tdb_lockall_gradual;
-       else
+#else
+               errx(1, "gradual is now the default implementation");
+#endif
+       } else
                usage("Arg1 should be 'lockall' or 'gradual'");
 
        num = atoi(argv[2]);