2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 1999-2005
7 Copyright (C) Paul `Rusty' Russell 2000
8 Copyright (C) Jeremy Allison 2000-2003
10 ** NOTE! The following LGPL license applies to the tdb
11 ** library. This does NOT imply that all of Samba is released
14 This library is free software; you can redistribute it and/or
15 modify it under the terms of the GNU Lesser General Public
16 License as published by the Free Software Foundation; either
17 version 3 of the License, or (at your option) any later version.
19 This library is distributed in the hope that it will be useful,
20 but WITHOUT ANY WARRANTY; without even the implied warranty of
21 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22 Lesser General Public License for more details.
24 You should have received a copy of the GNU Lesser General Public
25 License along with this library; if not, see <http://www.gnu.org/licenses/>.
30 #include <ccan/build_assert/build_assert.h>
32 /* If we were threaded, we could wait for unlock, but we're not, so fail. */
33 static enum TDB_ERROR owner_conflict(struct tdb_context *tdb, const char *call)
35 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
36 "%s: lock owned by another tdb in this process.",
40 static int fcntl_lock(struct tdb_context *tdb,
41 int rw, off_t off, off_t len, bool waitflag)
46 fl.l_whence = SEEK_SET;
51 add_stat(tdb, lock_lowlevel, 1);
53 return fcntl(tdb->file->fd, F_SETLKW, &fl);
55 add_stat(tdb, lock_nonblock, 1);
56 return fcntl(tdb->file->fd, F_SETLK, &fl);
60 static int fcntl_unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
63 #if 0 /* Check they matched up locks and unlocks correctly. */
68 locks = fopen("/proc/locks", "r");
70 while (fgets(line, 80, locks)) {
74 /* eg. 1: FLOCK ADVISORY WRITE 2440 08:01:2180826 0 EOF */
75 p = strchr(line, ':') + 1;
76 if (strncmp(p, " POSIX ADVISORY ", strlen(" POSIX ADVISORY ")))
78 p += strlen(" FLOCK ADVISORY ");
79 if (strncmp(p, "READ ", strlen("READ ")) == 0)
81 else if (strncmp(p, "WRITE ", strlen("WRITE ")) == 0)
86 if (atoi(p) != getpid())
88 p = strchr(strchr(p, ' ') + 1, ' ') + 1;
90 p = strchr(p, ' ') + 1;
91 if (strncmp(p, "EOF", 3) == 0)
94 l = atoi(p) - start + 1;
98 fprintf(stderr, "Len %u should be %u: %s",
103 fprintf(stderr, "Type %s wrong: %s",
104 rw == F_RDLCK ? "READ" : "WRITE", line);
113 fprintf(stderr, "Unlock on %u@%u not found!",
122 fl.l_whence = SEEK_SET;
127 return fcntl(tdb->file->fd, F_SETLKW, &fl);
130 /* a byte range locking function - return 0 on success
131 this functions locks len bytes at the specified offset.
133 note that a len of zero means lock to end of file
135 static enum TDB_ERROR tdb_brlock(struct tdb_context *tdb,
136 int rw_type, tdb_off_t offset, tdb_off_t len,
137 enum tdb_lock_flags flags)
141 if (tdb->flags & TDB_NOLOCK) {
145 if (rw_type == F_WRLCK && tdb->read_only) {
146 return tdb_logerr(tdb, TDB_ERR_RDONLY, TDB_LOG_USE_ERROR,
147 "Write lock attempted on read-only database");
150 /* A 32 bit system cannot open a 64-bit file, but it could have
151 * expanded since then: check here. */
152 if ((size_t)(offset + len) != offset + len) {
153 return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
154 "tdb_brlock: lock on giant offset %llu",
155 (long long)(offset + len));
159 ret = fcntl_lock(tdb, rw_type, offset, len,
160 flags & TDB_LOCK_WAIT);
161 } while (ret == -1 && errno == EINTR);
164 /* Generic lock error. errno set by fcntl.
165 * EAGAIN is an expected return from non-blocking
167 if (!(flags & TDB_LOCK_PROBE) && errno != EAGAIN) {
168 tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
169 "tdb_brlock failed (fd=%d) at"
170 " offset %zu rw_type=%d flags=%d len=%zu:"
172 tdb->file->fd, (size_t)offset, rw_type,
173 flags, (size_t)len, strerror(errno));
180 static enum TDB_ERROR tdb_brunlock(struct tdb_context *tdb,
181 int rw_type, tdb_off_t offset, size_t len)
185 if (tdb->flags & TDB_NOLOCK) {
190 ret = fcntl_unlock(tdb, rw_type, offset, len);
191 } while (ret == -1 && errno == EINTR);
194 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
195 "tdb_brunlock failed (fd=%d) at offset %zu"
196 " rw_type=%d len=%zu",
197 tdb->file->fd, (size_t)offset, rw_type,
204 upgrade a read lock to a write lock. This needs to be handled in a
205 special way as some OSes (such as solaris) have too conservative
206 deadlock detection and claim a deadlock when progress can be
207 made. For those OSes we may loop for a while.
209 enum TDB_ERROR tdb_allrecord_upgrade(struct tdb_context *tdb)
213 if (tdb->file->allrecord_lock.count != 1) {
214 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
215 "tdb_allrecord_upgrade failed:"
216 " count %u too high",
217 tdb->file->allrecord_lock.count);
220 if (tdb->file->allrecord_lock.off != 1) {
221 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
222 "tdb_allrecord_upgrade failed:"
223 " already upgraded?");
226 if (tdb->file->allrecord_lock.owner != tdb) {
227 return owner_conflict(tdb, "tdb_allrecord_upgrade");
232 if (tdb_brlock(tdb, F_WRLCK,
233 TDB_HASH_LOCK_START, 0,
234 TDB_LOCK_WAIT|TDB_LOCK_PROBE) == TDB_SUCCESS) {
235 tdb->file->allrecord_lock.ltype = F_WRLCK;
236 tdb->file->allrecord_lock.off = 0;
239 if (errno != EDEADLK) {
242 /* sleep for as short a time as we can - more portable than usleep() */
245 select(0, NULL, NULL, NULL, &tv);
247 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
248 "tdb_allrecord_upgrade failed");
251 static struct tdb_lock *find_nestlock(struct tdb_context *tdb, tdb_off_t offset,
252 const struct tdb_context *owner)
256 for (i=0; i<tdb->file->num_lockrecs; i++) {
257 if (tdb->file->lockrecs[i].off == offset) {
258 if (owner && tdb->file->lockrecs[i].owner != owner)
260 return &tdb->file->lockrecs[i];
266 enum TDB_ERROR tdb_lock_and_recover(struct tdb_context *tdb)
268 enum TDB_ERROR ecode;
270 ecode = tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK,
272 if (ecode != TDB_SUCCESS) {
276 ecode = tdb_lock_open(tdb, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK);
277 if (ecode != TDB_SUCCESS) {
278 tdb_allrecord_unlock(tdb, F_WRLCK);
281 ecode = tdb_transaction_recover(tdb);
282 tdb_unlock_open(tdb);
283 tdb_allrecord_unlock(tdb, F_WRLCK);
288 /* lock an offset in the database. */
289 static enum TDB_ERROR tdb_nest_lock(struct tdb_context *tdb,
290 tdb_off_t offset, int ltype,
291 enum tdb_lock_flags flags)
293 struct tdb_lock *new_lck;
294 enum TDB_ERROR ecode;
296 if (offset > (TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE
297 + tdb->file->map_size / 8)) {
298 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
299 "tdb_nest_lock: invalid offset %zu ltype=%d",
300 (size_t)offset, ltype);
303 if (tdb->flags & TDB_NOLOCK)
306 add_stat(tdb, locks, 1);
308 new_lck = find_nestlock(tdb, offset, NULL);
310 if (new_lck->owner != tdb) {
311 return owner_conflict(tdb, "tdb_nest_lock");
314 if (new_lck->ltype == F_RDLCK && ltype == F_WRLCK) {
315 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
317 " offset %zu has read lock",
320 /* Just increment the struct, posix locks don't stack. */
325 if (tdb->file->num_lockrecs
326 && offset >= TDB_HASH_LOCK_START
327 && offset < TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE) {
328 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
329 "tdb_nest_lock: already have a hash lock?");
332 new_lck = (struct tdb_lock *)realloc(
334 sizeof(*tdb->file->lockrecs) * (tdb->file->num_lockrecs+1));
335 if (new_lck == NULL) {
336 return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
338 " unable to allocate %zu lock struct",
339 tdb->file->num_lockrecs + 1);
341 tdb->file->lockrecs = new_lck;
343 /* Since fcntl locks don't nest, we do a lock for the first one,
344 and simply bump the count for future ones */
345 ecode = tdb_brlock(tdb, ltype, offset, 1, flags);
346 if (ecode != TDB_SUCCESS) {
350 /* First time we grab a lock, perhaps someone died in commit? */
351 if (!(flags & TDB_LOCK_NOCHECK)
352 && tdb->file->num_lockrecs == 0) {
353 tdb_bool_err berr = tdb_needs_recovery(tdb);
355 tdb_brunlock(tdb, ltype, offset, 1);
359 ecode = tdb_lock_and_recover(tdb);
360 if (ecode == TDB_SUCCESS) {
361 ecode = tdb_brlock(tdb, ltype, offset, 1,
364 if (ecode != TDB_SUCCESS) {
370 tdb->file->lockrecs[tdb->file->num_lockrecs].owner = tdb;
371 tdb->file->lockrecs[tdb->file->num_lockrecs].off = offset;
372 tdb->file->lockrecs[tdb->file->num_lockrecs].count = 1;
373 tdb->file->lockrecs[tdb->file->num_lockrecs].ltype = ltype;
374 tdb->file->num_lockrecs++;
379 static enum TDB_ERROR tdb_nest_unlock(struct tdb_context *tdb,
380 tdb_off_t off, int ltype)
382 struct tdb_lock *lck;
383 enum TDB_ERROR ecode;
385 if (tdb->flags & TDB_NOLOCK)
388 lck = find_nestlock(tdb, off, tdb);
389 if ((lck == NULL) || (lck->count == 0)) {
390 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
391 "tdb_nest_unlock: no lock for %zu",
395 if (lck->count > 1) {
401 * This lock has count==1 left, so we need to unlock it in the
402 * kernel. We don't bother with decrementing the in-memory array
403 * element, we're about to overwrite it with the last array element
406 ecode = tdb_brunlock(tdb, ltype, off, 1);
409 * Shrink the array by overwriting the element just unlocked with the
410 * last array element.
412 *lck = tdb->file->lockrecs[--tdb->file->num_lockrecs];
418 get the transaction lock
420 enum TDB_ERROR tdb_transaction_lock(struct tdb_context *tdb, int ltype)
422 return tdb_nest_lock(tdb, TDB_TRANSACTION_LOCK, ltype, TDB_LOCK_WAIT);
426 release the transaction lock
428 void tdb_transaction_unlock(struct tdb_context *tdb, int ltype)
430 tdb_nest_unlock(tdb, TDB_TRANSACTION_LOCK, ltype);
433 /* We only need to lock individual bytes, but Linux merges consecutive locks
434 * so we lock in contiguous ranges. */
435 static enum TDB_ERROR tdb_lock_gradual(struct tdb_context *tdb,
436 int ltype, enum tdb_lock_flags flags,
437 tdb_off_t off, tdb_off_t len)
439 enum TDB_ERROR ecode;
440 enum tdb_lock_flags nb_flags = (flags & ~TDB_LOCK_WAIT);
443 /* 0 would mean to end-of-file... */
445 /* Single hash. Just do blocking lock. */
446 return tdb_brlock(tdb, ltype, off, len, flags);
449 /* First we try non-blocking. */
450 if (tdb_brlock(tdb, ltype, off, len, nb_flags) == TDB_SUCCESS) {
454 /* Try locking first half, then second. */
455 ecode = tdb_lock_gradual(tdb, ltype, flags, off, len / 2);
456 if (ecode != TDB_SUCCESS)
459 ecode = tdb_lock_gradual(tdb, ltype, flags,
460 off + len / 2, len - len / 2);
461 if (ecode != TDB_SUCCESS) {
462 tdb_brunlock(tdb, ltype, off, len / 2);
467 /* lock/unlock entire database. It can only be upgradable if you have some
468 * other way of guaranteeing exclusivity (ie. transaction write lock). */
469 enum TDB_ERROR tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
470 enum tdb_lock_flags flags, bool upgradable)
472 enum TDB_ERROR ecode;
475 if (tdb->file->allrecord_lock.count) {
476 if (tdb->file->allrecord_lock.owner != tdb) {
477 return owner_conflict(tdb, "tdb_allrecord_lock");
481 || tdb->file->allrecord_lock.ltype == F_WRLCK) {
482 tdb->file->allrecord_lock.count++;
486 /* a global lock of a different type exists */
487 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
488 "tdb_allrecord_lock: already have %s lock",
489 tdb->file->allrecord_lock.ltype == F_RDLCK
493 if (tdb_has_hash_locks(tdb)) {
494 /* can't combine global and chain locks */
495 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
496 "tdb_allrecord_lock:"
497 " already have chain lock");
500 if (upgradable && ltype != F_RDLCK) {
501 /* tdb error: you can't upgrade a write lock! */
502 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
503 "tdb_allrecord_lock:"
504 " can't upgrade a write lock");
507 add_stat(tdb, locks, 1);
509 /* Lock hashes, gradually. */
510 ecode = tdb_lock_gradual(tdb, ltype, flags, TDB_HASH_LOCK_START,
511 TDB_HASH_LOCK_RANGE);
512 if (ecode != TDB_SUCCESS) {
513 if (!(flags & TDB_LOCK_PROBE)) {
514 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
515 "tdb_allrecord_lock hashes failed");
520 /* Lock free tables: there to end of file. */
521 ecode = tdb_brlock(tdb, ltype,
522 TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE,
524 if (ecode != TDB_SUCCESS) {
525 if (!(flags & TDB_LOCK_PROBE)) {
526 tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
527 "tdb_allrecord_lock freetables failed");
529 tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START,
530 TDB_HASH_LOCK_RANGE);
534 tdb->file->allrecord_lock.owner = tdb;
535 tdb->file->allrecord_lock.count = 1;
536 /* If it's upgradable, it's actually exclusive so we can treat
537 * it as a write lock. */
538 tdb->file->allrecord_lock.ltype = upgradable ? F_WRLCK : ltype;
539 tdb->file->allrecord_lock.off = upgradable;
541 /* Now check for needing recovery. */
542 if (flags & TDB_LOCK_NOCHECK)
545 berr = tdb_needs_recovery(tdb);
546 if (likely(berr == false))
549 tdb_allrecord_unlock(tdb, ltype);
552 ecode = tdb_lock_and_recover(tdb);
553 if (ecode != TDB_SUCCESS) {
559 enum TDB_ERROR tdb_lock_open(struct tdb_context *tdb, enum tdb_lock_flags flags)
561 return tdb_nest_lock(tdb, TDB_OPEN_LOCK, F_WRLCK, flags);
564 void tdb_unlock_open(struct tdb_context *tdb)
566 tdb_nest_unlock(tdb, TDB_OPEN_LOCK, F_WRLCK);
569 bool tdb_has_open_lock(struct tdb_context *tdb)
571 return !(tdb->flags & TDB_NOLOCK)
572 && find_nestlock(tdb, TDB_OPEN_LOCK, tdb) != NULL;
575 enum TDB_ERROR tdb_lock_expand(struct tdb_context *tdb, int ltype)
577 /* Lock doesn't protect data, so don't check (we recurse if we do!) */
578 return tdb_nest_lock(tdb, TDB_EXPANSION_LOCK, ltype,
579 TDB_LOCK_WAIT | TDB_LOCK_NOCHECK);
582 void tdb_unlock_expand(struct tdb_context *tdb, int ltype)
584 tdb_nest_unlock(tdb, TDB_EXPANSION_LOCK, ltype);
587 /* unlock entire db */
588 void tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
590 if (tdb->file->allrecord_lock.count == 0) {
591 tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
592 "tdb_allrecord_unlock: not locked!");
596 if (tdb->file->allrecord_lock.owner != tdb) {
597 tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
598 "tdb_allrecord_unlock: not locked by us!");
602 /* Upgradable locks are marked as write locks. */
603 if (tdb->file->allrecord_lock.ltype != ltype
604 && (!tdb->file->allrecord_lock.off || ltype != F_RDLCK)) {
605 tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
606 "tdb_allrecord_unlock: have %s lock",
607 tdb->file->allrecord_lock.ltype == F_RDLCK
612 if (tdb->file->allrecord_lock.count > 1) {
613 tdb->file->allrecord_lock.count--;
617 tdb->file->allrecord_lock.count = 0;
618 tdb->file->allrecord_lock.ltype = 0;
620 tdb_brunlock(tdb, ltype, TDB_HASH_LOCK_START, 0);
623 bool tdb_has_expansion_lock(struct tdb_context *tdb)
625 return find_nestlock(tdb, TDB_EXPANSION_LOCK, tdb) != NULL;
628 bool tdb_has_hash_locks(struct tdb_context *tdb)
632 for (i=0; i<tdb->file->num_lockrecs; i++) {
633 if (tdb->file->lockrecs[i].off >= TDB_HASH_LOCK_START
634 && tdb->file->lockrecs[i].off < (TDB_HASH_LOCK_START
635 + TDB_HASH_LOCK_RANGE))
641 static bool tdb_has_free_lock(struct tdb_context *tdb)
645 if (tdb->flags & TDB_NOLOCK)
648 for (i=0; i<tdb->file->num_lockrecs; i++) {
649 if (tdb->file->lockrecs[i].off
650 > TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE)
656 enum TDB_ERROR tdb_lock_hashes(struct tdb_context *tdb,
658 tdb_len_t hash_range,
659 int ltype, enum tdb_lock_flags waitflag)
661 /* FIXME: Do this properly, using hlock_range */
662 unsigned lock = TDB_HASH_LOCK_START
663 + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
665 /* a allrecord lock allows us to avoid per chain locks */
666 if (tdb->file->allrecord_lock.count) {
667 if (tdb->file->allrecord_lock.owner != tdb)
668 return owner_conflict(tdb, "tdb_lock_hashes");
669 if (ltype == tdb->file->allrecord_lock.ltype
670 || ltype == F_RDLCK) {
674 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
676 " already have %s allrecordlock",
677 tdb->file->allrecord_lock.ltype == F_RDLCK
681 if (tdb_has_free_lock(tdb)) {
682 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
683 "tdb_lock_hashes: already have free lock");
686 if (tdb_has_expansion_lock(tdb)) {
687 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
689 " already have expansion lock");
692 return tdb_nest_lock(tdb, lock, ltype, waitflag);
695 enum TDB_ERROR tdb_unlock_hashes(struct tdb_context *tdb,
697 tdb_len_t hash_range, int ltype)
699 unsigned lock = TDB_HASH_LOCK_START
700 + (hash_lock >> (64 - TDB_HASH_LOCK_RANGE_BITS));
702 if (tdb->flags & TDB_NOLOCK)
705 /* a allrecord lock allows us to avoid per chain locks */
706 if (tdb->file->allrecord_lock.count) {
707 if (tdb->file->allrecord_lock.ltype == F_RDLCK
708 && ltype == F_WRLCK) {
709 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
710 "tdb_unlock_hashes RO allrecord!");
715 return tdb_nest_unlock(tdb, lock, ltype);
718 /* Hash locks use TDB_HASH_LOCK_START + the next 30 bits.
719 * Then we begin; bucket offsets are sizeof(tdb_len_t) apart, so we divide.
720 * The result is that on 32 bit systems we don't use lock values > 2^31 on
721 * files that are less than 4GB.
723 static tdb_off_t free_lock_off(tdb_off_t b_off)
725 return TDB_HASH_LOCK_START + TDB_HASH_LOCK_RANGE
726 + b_off / sizeof(tdb_off_t);
729 enum TDB_ERROR tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
730 enum tdb_lock_flags waitflag)
732 assert(b_off >= sizeof(struct tdb_header));
734 if (tdb->flags & TDB_NOLOCK)
737 /* a allrecord lock allows us to avoid per chain locks */
738 if (tdb->file->allrecord_lock.count) {
739 if (tdb->file->allrecord_lock.ltype == F_WRLCK)
741 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
742 "tdb_lock_free_bucket with"
743 " read-only allrecordlock!");
747 if (tdb_has_expansion_lock(tdb)) {
748 return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
749 "tdb_lock_free_bucket:"
750 " already have expansion lock");
754 return tdb_nest_lock(tdb, free_lock_off(b_off), F_WRLCK, waitflag);
757 void tdb_unlock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off)
759 if (tdb->file->allrecord_lock.count)
762 tdb_nest_unlock(tdb, free_lock_off(b_off), F_WRLCK);
765 void tdb_unlock_all(struct tdb_context *tdb)
769 while (tdb->file->allrecord_lock.count
770 && tdb->file->allrecord_lock.owner == tdb) {
771 tdb_allrecord_unlock(tdb, tdb->file->allrecord_lock.ltype);
774 for (i=0; i<tdb->file->num_lockrecs; i++) {
775 if (tdb->file->lockrecs[i].owner == tdb) {
777 tdb->file->lockrecs[i].off,
778 tdb->file->lockrecs[i].ltype);