]> git.ozlabs.org Git - ccan/blobdiff - ccan/tdb2/lock.c
check_type: fix incorrect documentation.
[ccan] / ccan / tdb2 / lock.c
index 49313a5419cb7b83b7f672aaff32f5942ddf079b..a71c95f6e5cc9479288deae43581361019361c38 100644 (file)
@@ -30,7 +30,7 @@
 #include <ccan/build_assert/build_assert.h>
 
 /* If we were threaded, we could wait for unlock, but we're not, so fail. */
-static enum TDB_ERROR owner_conflict(struct tdb_context *tdb, const char *call)
+enum TDB_ERROR owner_conflict(struct tdb_context *tdb, const char *call)
 {
        return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
                          "%s: lock owned by another tdb in this process.",
@@ -38,8 +38,7 @@ static enum TDB_ERROR owner_conflict(struct tdb_context *tdb, const char *call)
 }
 
 /* If we fork, we no longer really own locks. */
-static bool check_lock_pid(struct tdb_context *tdb,
-                          const char *call, bool log)
+bool check_lock_pid(struct tdb_context *tdb, const char *call, bool log)
 {
        /* No locks?  No problem! */
        if (tdb->file->allrecord_lock.count == 0
@@ -99,16 +98,21 @@ int tdb_fcntl_unlock(int fd, int rw, off_t off, off_t len, void *unused)
 static int lock(struct tdb_context *tdb,
                      int rw, off_t off, off_t len, bool waitflag)
 {
+       int ret;
        if (tdb->file->allrecord_lock.count == 0
            && tdb->file->num_lockrecs == 0) {
                tdb->file->locker = getpid();
        }
 
        tdb->stats.lock_lowlevel++;
-       if (!waitflag)
+       ret = tdb->lock_fn(tdb->file->fd, rw, off, len, waitflag,
+                          tdb->lock_data);
+       if (!waitflag) {
                tdb->stats.lock_nonblock++;
-       return tdb->lock_fn(tdb->file->fd, rw, off, len, waitflag,
-                           tdb->lock_data);
+               if (ret != 0)
+                       tdb->stats.lock_nonblock_fail++;
+       }
+       return ret;
 }
 
 static int unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
@@ -179,9 +183,9 @@ static int unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
 
    note that a len of zero means lock to end of file
 */
-static enum TDB_ERROR tdb_brlock(struct tdb_context *tdb,
-                                int rw_type, tdb_off_t offset, tdb_off_t len,
-                                enum tdb_lock_flags flags)
+enum TDB_ERROR tdb_brlock(struct tdb_context *tdb,
+                         int rw_type, tdb_off_t offset, tdb_off_t len,
+                         enum tdb_lock_flags flags)
 {
        int ret;
 
@@ -189,7 +193,7 @@ static enum TDB_ERROR tdb_brlock(struct tdb_context *tdb,
                return TDB_SUCCESS;
        }
 
-       if (rw_type == F_WRLCK && tdb->read_only) {
+       if (rw_type == F_WRLCK && (tdb->flags & TDB_RDONLY)) {
                return tdb_logerr(tdb, TDB_ERR_RDONLY, TDB_LOG_USE_ERROR,
                                  "Write lock attempted on read-only database");
        }
@@ -221,8 +225,8 @@ static enum TDB_ERROR tdb_brlock(struct tdb_context *tdb,
        return TDB_SUCCESS;
 }
 
-static enum TDB_ERROR tdb_brunlock(struct tdb_context *tdb,
-                                  int rw_type, tdb_off_t offset, size_t len)
+enum TDB_ERROR tdb_brunlock(struct tdb_context *tdb,
+                           int rw_type, tdb_off_t offset, size_t len)
 {
        if (tdb->flags & TDB_NOLOCK) {
                return TDB_SUCCESS;
@@ -247,7 +251,7 @@ static enum TDB_ERROR tdb_brunlock(struct tdb_context *tdb,
   deadlock detection and claim a deadlock when progress can be
   made. For those OSes we may loop for a while.
 */
-enum TDB_ERROR tdb_allrecord_upgrade(struct tdb_context *tdb)
+enum TDB_ERROR tdb_allrecord_upgrade(struct tdb_context *tdb, off_t start)
 {
        int count = 1000;
 
@@ -273,8 +277,7 @@ enum TDB_ERROR tdb_allrecord_upgrade(struct tdb_context *tdb)
 
        while (count--) {
                struct timeval tv;
-               if (tdb_brlock(tdb, F_WRLCK,
-                              TDB_HASH_LOCK_START, 0,
+               if (tdb_brlock(tdb, F_WRLCK, start, 0,
                               TDB_LOCK_WAIT|TDB_LOCK_PROBE) == TDB_SUCCESS) {
                        tdb->file->allrecord_lock.ltype = F_WRLCK;
                        tdb->file->allrecord_lock.off = 0;
@@ -323,28 +326,29 @@ enum TDB_ERROR tdb_lock_and_recover(struct tdb_context *tdb)
                return ecode;
        }
 
-       ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
+       ecode = tdb_lock_open(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
        if (ecode != TDB_SUCCESS) {
                tdb_allrecord_unlock(tdb, F_WRLCK);
                return ecode;
        }
        ecode = tdb_transaction_recover(tdb);
-       tdb_unlock_open(tdb);
+       tdb_unlock_open(tdb, F_WRLCK);
        tdb_allrecord_unlock(tdb, F_WRLCK);
 
        return ecode;
 }
 
 /* lock an offset in the database. */
-static enum TDB_ERROR tdb_nest_lock(struct tdb_context *tdb,
-                                   tdb_off_t offset, int ltype,
-                                   enum tdb_lock_flags flags)
+enum TDB_ERROR tdb_nest_lock(struct tdb_context *tdb,
+                            tdb_off_t offset, int ltype,
+                            enum tdb_lock_flags flags)
 {
        struct tdb_lock *new_lck;
        enum TDB_ERROR ecode;
 
-       if (offset > (TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE
-                     + tdb->file->map_size / 8)) {
+       if (!(tdb->flags & TDB_VERSION1)
+           && offset > (TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE
+                        + tdb->file->map_size / 8)) {
                return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
                                  "tdb_nest_lock: invalid offset %zu ltype=%d",
                                  (size_t)offset, ltype);
@@ -411,7 +415,7 @@ static enum TDB_ERROR tdb_nest_lock(struct tdb_context *tdb,
                        tdb_brunlock(tdb, ltype, offset, 1);
 
                        if (berr < 0)
-                               return berr;
+                               return TDB_OFF_TO_ERR(berr);
                        ecode = tdb_lock_and_recover(tdb);
                        if (ecode == TDB_SUCCESS) {
                                ecode = tdb_brlock(tdb, ltype, offset, 1,
@@ -432,8 +436,8 @@ static enum TDB_ERROR tdb_nest_lock(struct tdb_context *tdb,
        return TDB_SUCCESS;
 }
 
-static enum TDB_ERROR tdb_nest_unlock(struct tdb_context *tdb,
-                                     tdb_off_t off, int ltype)
+enum TDB_ERROR tdb_nest_unlock(struct tdb_context *tdb,
+                              tdb_off_t off, int ltype)
 {
        struct tdb_lock *lck;
        enum TDB_ERROR ecode;
@@ -488,9 +492,9 @@ void tdb_transaction_unlock(struct tdb_context *tdb, int ltype)
 
 /* We only need to lock individual bytes, but Linux merges consecutive locks
  * so we lock in contiguous ranges. */
-static enum TDB_ERROR tdb_lock_gradual(struct tdb_context *tdb,
-                                      int ltype, enum tdb_lock_flags flags,
-                                      tdb_off_t off, tdb_off_t len)
+enum TDB_ERROR tdb_lock_gradual(struct tdb_context *tdb,
+                               int ltype, enum tdb_lock_flags flags,
+                               tdb_off_t off, tdb_off_t len)
 {
        enum TDB_ERROR ecode;
        enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
@@ -503,8 +507,9 @@ static enum TDB_ERROR tdb_lock_gradual(struct tdb_context *tdb,
        }
 
        /* First we try non-blocking. */
-       if (tdb_brlock(tdb, ltype, off, len, nb_flags) == TDB_SUCCESS) {
-               return TDB_SUCCESS;
+       ecode = tdb_brlock(tdb, ltype, off, len, nb_flags);
+       if (ecode != TDB_ERR_LOCK) {
+               return ecode;
        }
 
        /* Try locking first half, then second. */
@@ -528,6 +533,12 @@ enum TDB_ERROR tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
        enum TDB_ERROR ecode;
        tdb_bool_err berr;
 
+       if (tdb->flags & TDB_VERSION1) {
+               if (tdb1_allrecord_lock(tdb, ltype, flags, upgradable) == -1)
+                       return tdb->last_error;
+               return TDB_SUCCESS;
+       }
+
        if (tdb->flags & TDB_NOLOCK)
                return TDB_SUCCESS;
 
@@ -602,7 +613,7 @@ again:
 
        tdb_allrecord_unlock(tdb, ltype);
        if (berr < 0)
-               return berr;
+               return TDB_OFF_TO_ERR(berr);
        ecode = tdb_lock_and_recover(tdb);
        if (ecode != TDB_SUCCESS) {
                return ecode;
@@ -610,14 +621,15 @@ again:
        goto again;
 }
 
-enum TDB_ERROR tdb_lock_open(struct tdb_context *tdb, enum tdb_lock_flags flags)
+enum TDB_ERROR tdb_lock_open(struct tdb_context *tdb,
+                            int ltype, enum tdb_lock_flags flags)
 {
-       return tdb_nest_lock(tdb, TDB_OPEN_LOCK, F_WRLCK, flags);
+       return tdb_nest_lock(tdb, TDB_OPEN_LOCK, ltype, flags);
 }
 
-void tdb_unlock_open(struct tdb_context *tdb)
+void tdb_unlock_open(struct tdb_context *tdb, int ltype)
 {
-       tdb_nest_unlock(tdb, TDB_OPEN_LOCK, F_WRLCK);
+       tdb_nest_unlock(tdb, TDB_OPEN_LOCK, ltype);
 }
 
 bool tdb_has_open_lock(struct tdb_context *tdb)
@@ -641,6 +653,11 @@ void tdb_unlock_expand(struct tdb_context *tdb, int ltype)
 /* unlock entire db */
 void tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
 {
+       if (tdb->flags & TDB_VERSION1) {
+               tdb1_allrecord_unlock(tdb, ltype);
+               return;
+       }
+
        if (tdb->flags & TDB_NOLOCK)
                return;
 
@@ -769,6 +786,11 @@ enum TDB_ERROR tdb_unlock_hashes(struct tdb_context *tdb,
                        return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
                                          "tdb_unlock_hashes RO allrecord!");
                }
+               if (tdb->file->allrecord_lock.owner != tdb) {
+                       return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
+                                         "tdb_unlock_hashes:"
+                                         " not locked by us!");
+               }
                return TDB_SUCCESS;
        }
 
@@ -799,6 +821,10 @@ enum TDB_ERROR tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
                if (!check_lock_pid(tdb, "tdb_lock_free_bucket", true))
                        return TDB_ERR_LOCK;
 
+               if (tdb->file->allrecord_lock.owner != tdb) {
+                       return owner_conflict(tdb, "tdb_lock_free_bucket");
+               }
+
                if (tdb->file->allrecord_lock.ltype == F_WRLCK)
                        return 0;
                return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,