]> git.ozlabs.org Git - ccan/blob - ccan/tdb/lock.c
Merge.
[ccan] / ccan / tdb / lock.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 #define TDB_MARK_LOCK 0x80000000
31
32 void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr)
33 {
34         tdb->interrupt_sig_ptr = ptr;
35 }
36
37 /* a byte range locking function - return 0 on success
38    this functions locks/unlocks 1 byte at the specified offset.
39
40    On error, errno is also set so that errors are passed back properly
41    through tdb_open(). 
42
43    note that a len of zero means lock to end of file
44 */
45 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, 
46                int rw_type, int lck_type, int probe, size_t len)
47 {
48         struct flock fl;
49         int ret;
50
51         if (tdb->flags & TDB_NOLOCK) {
52                 return 0;
53         }
54
55         if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
56                 tdb->ecode = TDB_ERR_RDONLY;
57                 return -1;
58         }
59
60         fl.l_type = rw_type;
61         fl.l_whence = SEEK_SET;
62         fl.l_start = offset;
63         fl.l_len = len;
64         fl.l_pid = 0;
65
66         do {
67                 ret = fcntl(tdb->fd,lck_type,&fl);
68
69                 /* Check for a sigalarm break. */
70                 if (ret == -1 && errno == EINTR &&
71                                 tdb->interrupt_sig_ptr &&
72                                 *tdb->interrupt_sig_ptr) {
73                         break;
74                 }
75         } while (ret == -1 && errno == EINTR);
76
77         if (ret == -1) {
78                 /* Generic lock error. errno set by fcntl.
79                  * EAGAIN is an expected return from non-blocking
80                  * locks. */
81                 if (!probe && lck_type != F_SETLK) {
82                         /* Ensure error code is set for log fun to examine. */
83                         tdb->ecode = TDB_ERR_LOCK;
84                         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n", 
85                                  tdb->fd, offset, rw_type, lck_type, (int)len));
86                 }
87                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
88         }
89         return 0;
90 }
91
92
93 /*
94   upgrade a read lock to a write lock. This needs to be handled in a
95   special way as some OSes (such as solaris) have too conservative
96   deadlock detection and claim a deadlock when progress can be
97   made. For those OSes we may loop for a while.  
98 */
99 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
100 {
101         int count = 1000;
102         while (count--) {
103                 struct timeval tv;
104                 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
105                         return 0;
106                 }
107                 if (errno != EDEADLK) {
108                         break;
109                 }
110                 /* sleep for as short a time as we can - more portable than usleep() */
111                 tv.tv_sec = 0;
112                 tv.tv_usec = 1;
113                 select(0, NULL, NULL, NULL, &tv);
114         }
115         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
116         return -1;
117 }
118
119
120 /* lock a list in the database. list -1 is the alloc list */
121 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
122 {
123         struct tdb_lock_type *new_lck;
124         int i;
125         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
126
127         ltype &= ~TDB_MARK_LOCK;
128
129         /* a global lock allows us to avoid per chain locks */
130         if (tdb->global_lock.count && 
131             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
132                 return 0;
133         }
134
135         if (tdb->global_lock.count) {
136                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
137         }
138
139         if (list < -1 || list >= (int)tdb->header.hash_size) {
140                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n", 
141                            list, ltype));
142                 return -1;
143         }
144         if (tdb->flags & TDB_NOLOCK)
145                 return 0;
146
147         for (i=0; i<tdb->num_lockrecs; i++) {
148                 if (tdb->lockrecs[i].list == list) {
149                         if (tdb->lockrecs[i].count == 0) {
150                                 /*
151                                  * Can't happen, see tdb_unlock(). It should
152                                  * be an assert.
153                                  */
154                                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
155                                          "lck->count == 0 for list %d", list));
156                         }
157                         /*
158                          * Just increment the in-memory struct, posix locks
159                          * don't stack.
160                          */
161                         tdb->lockrecs[i].count++;
162                         return 0;
163                 }
164         }
165
166         new_lck = (struct tdb_lock_type *)realloc(
167                 tdb->lockrecs,
168                 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
169         if (new_lck == NULL) {
170                 errno = ENOMEM;
171                 return -1;
172         }
173         tdb->lockrecs = new_lck;
174
175         /* Since fcntl locks don't nest, we do a lock for the first one,
176            and simply bump the count for future ones */
177         if (!mark_lock &&
178             tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
179                                      0, 1)) {
180                 return -1;
181         }
182
183         tdb->num_locks++;
184
185         tdb->lockrecs[tdb->num_lockrecs].list = list;
186         tdb->lockrecs[tdb->num_lockrecs].count = 1;
187         tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
188         tdb->num_lockrecs += 1;
189
190         return 0;
191 }
192
193 /* lock a list in the database. list -1 is the alloc list */
194 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
195 {
196         int ret;
197         ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
198         if (ret) {
199                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
200                          "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
201         }
202         return ret;
203 }
204
205 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
206 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
207 {
208         return _tdb_lock(tdb, list, ltype, F_SETLK);
209 }
210
211
212 /* unlock the database: returns void because it's too late for errors. */
213         /* changed to return int it may be interesting to know there
214            has been an error  --simo */
215 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
216 {
217         int ret = -1;
218         int i;
219         struct tdb_lock_type *lck = NULL;
220         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
221
222         ltype &= ~TDB_MARK_LOCK;
223
224         /* a global lock allows us to avoid per chain locks */
225         if (tdb->global_lock.count && 
226             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
227                 return 0;
228         }
229
230         if (tdb->global_lock.count) {
231                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
232         }
233
234         if (tdb->flags & TDB_NOLOCK)
235                 return 0;
236
237         /* Sanity checks */
238         if (list < -1 || list >= (int)tdb->header.hash_size) {
239                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
240                 return ret;
241         }
242
243         for (i=0; i<tdb->num_lockrecs; i++) {
244                 if (tdb->lockrecs[i].list == list) {
245                         lck = &tdb->lockrecs[i];
246                         break;
247                 }
248         }
249
250         if ((lck == NULL) || (lck->count == 0)) {
251                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
252                 return -1;
253         }
254
255         if (lck->count > 1) {
256                 lck->count--;
257                 return 0;
258         }
259
260         /*
261          * This lock has count==1 left, so we need to unlock it in the
262          * kernel. We don't bother with decrementing the in-memory array
263          * element, we're about to overwrite it with the last array element
264          * anyway.
265          */
266
267         if (mark_lock) {
268                 ret = 0;
269         } else {
270                 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
271                                                F_SETLKW, 0, 1);
272         }
273         tdb->num_locks--;
274
275         /*
276          * Shrink the array by overwriting the element just unlocked with the
277          * last array element.
278          */
279
280         if (tdb->num_lockrecs > 1) {
281                 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
282         }
283         tdb->num_lockrecs -= 1;
284
285         /*
286          * We don't bother with realloc when the array shrinks, but if we have
287          * a completely idle tdb we should get rid of the locked array.
288          */
289
290         if (tdb->num_lockrecs == 0) {
291                 SAFE_FREE(tdb->lockrecs);
292         }
293
294         if (ret)
295                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n")); 
296         return ret;
297 }
298
299 /*
300   get the transaction lock
301  */
302 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
303 {
304         if (tdb->global_lock.count) {
305                 return 0;
306         }
307         if (tdb->have_transaction_lock) {
308                 tdb->have_transaction_lock++;
309                 return 0;
310         }
311         if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype, 
312                                      F_SETLKW, 0, 1) == -1) {
313                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
314                 tdb->ecode = TDB_ERR_LOCK;
315                 return -1;
316         }
317         tdb->have_transaction_lock++;
318         return 0;
319 }
320
321 /*
322   release the transaction lock
323  */
324 int tdb_transaction_unlock(struct tdb_context *tdb)
325 {
326         if (tdb->global_lock.count) {
327                 return 0;
328         }
329         if (--tdb->have_transaction_lock) {
330                 return 0;
331         }
332         return tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
333 }
334
335
336
337
338 /* lock/unlock entire database */
339 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
340 {
341         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
342
343         ltype &= ~TDB_MARK_LOCK;
344
345         /* There are no locks on read-only dbs */
346         if (tdb->read_only || tdb->traverse_read)
347                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
348
349         if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
350                 tdb->global_lock.count++;
351                 return 0;
352         }
353
354         if (tdb->global_lock.count) {
355                 /* a global lock of a different type exists */
356                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
357         }
358         
359         if (tdb->num_locks != 0) {
360                 /* can't combine global and chain locks */
361                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
362         }
363
364         if (!mark_lock &&
365             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
366                                      0, 4*tdb->header.hash_size)) {
367                 if (op == F_SETLKW) {
368                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
369                 }
370                 return -1;
371         }
372
373         tdb->global_lock.count = 1;
374         tdb->global_lock.ltype = ltype;
375
376         return 0;
377 }
378
379
380
381 /* unlock entire db */
382 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
383 {
384         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
385
386         ltype &= ~TDB_MARK_LOCK;
387
388         /* There are no locks on read-only dbs */
389         if (tdb->read_only || tdb->traverse_read) {
390                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
391         }
392
393         if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
394                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
395         }
396
397         if (tdb->global_lock.count > 1) {
398                 tdb->global_lock.count--;
399                 return 0;
400         }
401
402         if (!mark_lock &&
403             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 
404                                      0, 4*tdb->header.hash_size)) {
405                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
406                 return -1;
407         }
408
409         tdb->global_lock.count = 0;
410         tdb->global_lock.ltype = 0;
411
412         return 0;
413 }
414
415 /* lock entire database with write lock */
416 int tdb_lockall(struct tdb_context *tdb)
417 {
418         tdb_trace(tdb, "tdb_lockall");
419         return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
420 }
421
422 /* lock entire database with write lock - mark only */
423 int tdb_lockall_mark(struct tdb_context *tdb)
424 {
425         tdb_trace(tdb, "tdb_lockall_mark");
426         return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
427 }
428
429 /* unlock entire database with write lock - unmark only */
430 int tdb_lockall_unmark(struct tdb_context *tdb)
431 {
432         tdb_trace(tdb, "tdb_lockall_unmark");
433         return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
434 }
435
436 /* lock entire database with write lock - nonblocking varient */
437 int tdb_lockall_nonblock(struct tdb_context *tdb)
438 {
439         int ret = _tdb_lockall(tdb, F_WRLCK, F_SETLK);
440         tdb_trace_ret(tdb, "tdb_lockall_nonblock", ret);
441         return ret;
442 }
443
444 /* unlock entire database with write lock */
445 int tdb_unlockall(struct tdb_context *tdb)
446 {
447         tdb_trace(tdb, "tdb_unlockall");
448         return _tdb_unlockall(tdb, F_WRLCK);
449 }
450
451 /* lock entire database with read lock */
452 int tdb_lockall_read(struct tdb_context *tdb)
453 {
454         tdb_trace(tdb, "tdb_lockall_read");
455         return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
456 }
457
458 /* lock entire database with read lock - nonblock varient */
459 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
460 {
461         int ret = _tdb_lockall(tdb, F_RDLCK, F_SETLK);
462         tdb_trace_ret(tdb, "tdb_lockall_read_nonblock", ret);
463         return ret;
464 }
465
466 /* unlock entire database with read lock */
467 int tdb_unlockall_read(struct tdb_context *tdb)
468 {
469         tdb_trace(tdb, "tdb_unlockall_read");
470         return _tdb_unlockall(tdb, F_RDLCK);
471 }
472
473 /* lock/unlock one hash chain. This is meant to be used to reduce
474    contention - it cannot guarantee how many records will be locked */
475 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
476 {
477         int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
478         tdb_trace_1rec(tdb, "tdb_chainlock", key);
479         return ret;
480 }
481
482 /* lock/unlock one hash chain, non-blocking. This is meant to be used
483    to reduce contention - it cannot guarantee how many records will be
484    locked */
485 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
486 {
487         int ret = tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
488         tdb_trace_1rec_ret(tdb, "tdb_chainlock_nonblock", key, ret);
489         return ret;
490 }
491
492 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
493 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
494 {
495         int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
496         tdb_trace_1rec(tdb, "tdb_chainlock_mark", key);
497         return ret;
498 }
499
500 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
501 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
502 {
503         tdb_trace_1rec(tdb, "tdb_chainlock_unmark", key);
504         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
505 }
506
507 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
508 {
509         tdb_trace_1rec(tdb, "tdb_chainunlock", key);
510         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
511 }
512
513 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
514 {
515         int ret;
516         ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
517         tdb_trace_1rec(tdb, "tdb_chainlock_read", key);
518         return ret;
519 }
520
521 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
522 {
523         tdb_trace_1rec(tdb, "tdb_chainunlock_read", key);
524         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
525 }
526
527 /* record lock stops delete underneath */
528 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
529 {
530         if (tdb->global_lock.count) {
531                 return 0;
532         }
533         return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
534 }
535
536 /*
537   Write locks override our own fcntl readlocks, so check it here.
538   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
539   an error to fail to get the lock here.
540 */
541 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
542 {
543         struct tdb_traverse_lock *i;
544         for (i = &tdb->travlocks; i; i = i->next)
545                 if (i->off == off)
546                         return -1;
547         return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
548 }
549
550 /*
551   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
552   an error to fail to get the lock here.
553 */
554 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
555 {
556         return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
557 }
558
559 /* fcntl locks don't stack: avoid unlocking someone else's */
560 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
561 {
562         struct tdb_traverse_lock *i;
563         uint32_t count = 0;
564
565         if (tdb->global_lock.count) {
566                 return 0;
567         }
568
569         if (off == 0)
570                 return 0;
571         for (i = &tdb->travlocks; i; i = i->next)
572                 if (i->off == off)
573                         count++;
574         return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
575 }