]> git.ozlabs.org Git - ccan/blob - ccan/tdb/lock.c
117596a3f203814daa9e4611d96b1216de050643
[ccan] / ccan / tdb / lock.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 #define TDB_MARK_LOCK 0x80000000
31
32 void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr)
33 {
34         tdb->interrupt_sig_ptr = ptr;
35 }
36
37 /* a byte range locking function - return 0 on success
38    this functions locks/unlocks 1 byte at the specified offset.
39
40    On error, errno is also set so that errors are passed back properly
41    through tdb_open(). 
42
43    note that a len of zero means lock to end of file
44 */
45 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, 
46                int rw_type, int lck_type, int probe, size_t len)
47 {
48         struct flock fl;
49         int ret;
50
51         if (tdb->flags & TDB_NOLOCK) {
52                 return 0;
53         }
54
55         if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
56                 tdb->ecode = TDB_ERR_RDONLY;
57                 return -1;
58         }
59
60         fl.l_type = rw_type;
61         fl.l_whence = SEEK_SET;
62         fl.l_start = offset;
63         fl.l_len = len;
64         fl.l_pid = 0;
65
66         do {
67                 ret = fcntl(tdb->fd,lck_type,&fl);
68
69                 /* Check for a sigalarm break. */
70                 if (ret == -1 && errno == EINTR &&
71                                 tdb->interrupt_sig_ptr &&
72                                 *tdb->interrupt_sig_ptr) {
73                         break;
74                 }
75         } while (ret == -1 && errno == EINTR);
76
77         if (ret == -1) {
78                 /* Generic lock error. errno set by fcntl.
79                  * EAGAIN is an expected return from non-blocking
80                  * locks. */
81                 if (!probe && lck_type != F_SETLK) {
82                         /* Ensure error code is set for log fun to examine. */
83                         tdb->ecode = TDB_ERR_LOCK;
84                         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n", 
85                                  tdb->fd, offset, rw_type, lck_type, (int)len));
86                 }
87                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
88         }
89         return 0;
90 }
91
92
93 /*
94   upgrade a read lock to a write lock. This needs to be handled in a
95   special way as some OSes (such as solaris) have too conservative
96   deadlock detection and claim a deadlock when progress can be
97   made. For those OSes we may loop for a while.  
98 */
99 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
100 {
101         int count = 1000;
102         while (count--) {
103                 struct timeval tv;
104                 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
105                         return 0;
106                 }
107                 if (errno != EDEADLK) {
108                         break;
109                 }
110                 /* sleep for as short a time as we can - more portable than usleep() */
111                 tv.tv_sec = 0;
112                 tv.tv_usec = 1;
113                 select(0, NULL, NULL, NULL, &tv);
114         }
115         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
116         return -1;
117 }
118
119
120 /* lock a list in the database. list -1 is the alloc list */
121 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
122 {
123         struct tdb_lock_type *new_lck;
124         int i;
125         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
126
127         ltype &= ~TDB_MARK_LOCK;
128
129         /* a global lock allows us to avoid per chain locks */
130         if (tdb->global_lock.count && 
131             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
132                 return 0;
133         }
134
135         if (tdb->global_lock.count) {
136                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
137         }
138
139         if (list < -1 || list >= (int)tdb->header.hash_size) {
140                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n", 
141                            list, ltype));
142                 return -1;
143         }
144         if (tdb->flags & TDB_NOLOCK)
145                 return 0;
146
147         for (i=0; i<tdb->num_lockrecs; i++) {
148                 if (tdb->lockrecs[i].list == list) {
149                         if (tdb->lockrecs[i].count == 0) {
150                                 /*
151                                  * Can't happen, see tdb_unlock(). It should
152                                  * be an assert.
153                                  */
154                                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
155                                          "lck->count == 0 for list %d", list));
156                         }
157                         /*
158                          * Just increment the in-memory struct, posix locks
159                          * don't stack.
160                          */
161                         tdb->lockrecs[i].count++;
162                         return 0;
163                 }
164         }
165
166         new_lck = (struct tdb_lock_type *)realloc(
167                 tdb->lockrecs,
168                 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
169         if (new_lck == NULL) {
170                 errno = ENOMEM;
171                 return -1;
172         }
173         tdb->lockrecs = new_lck;
174
175         /* Since fcntl locks don't nest, we do a lock for the first one,
176            and simply bump the count for future ones */
177         if (!mark_lock &&
178             tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
179                                      0, 1)) {
180                 return -1;
181         }
182
183         tdb->num_locks++;
184
185         tdb->lockrecs[tdb->num_lockrecs].list = list;
186         tdb->lockrecs[tdb->num_lockrecs].count = 1;
187         tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
188         tdb->num_lockrecs += 1;
189
190         return 0;
191 }
192
193 /* lock a list in the database. list -1 is the alloc list */
194 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
195 {
196         int ret;
197         ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
198         if (ret) {
199                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
200                          "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
201         }
202         return ret;
203 }
204
205 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
206 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
207 {
208         return _tdb_lock(tdb, list, ltype, F_SETLK);
209 }
210
211
212 /* unlock the database: returns void because it's too late for errors. */
213         /* changed to return int it may be interesting to know there
214            has been an error  --simo */
215 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
216 {
217         int ret = -1;
218         int i;
219         struct tdb_lock_type *lck = NULL;
220         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
221
222         ltype &= ~TDB_MARK_LOCK;
223
224         /* a global lock allows us to avoid per chain locks */
225         if (tdb->global_lock.count && 
226             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
227                 return 0;
228         }
229
230         if (tdb->global_lock.count) {
231                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
232         }
233
234         if (tdb->flags & TDB_NOLOCK)
235                 return 0;
236
237         /* Sanity checks */
238         if (list < -1 || list >= (int)tdb->header.hash_size) {
239                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
240                 return ret;
241         }
242
243         for (i=0; i<tdb->num_lockrecs; i++) {
244                 if (tdb->lockrecs[i].list == list) {
245                         lck = &tdb->lockrecs[i];
246                         break;
247                 }
248         }
249
250         if ((lck == NULL) || (lck->count == 0)) {
251                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
252                 return -1;
253         }
254
255         if (lck->count > 1) {
256                 lck->count--;
257                 return 0;
258         }
259
260         /*
261          * This lock has count==1 left, so we need to unlock it in the
262          * kernel. We don't bother with decrementing the in-memory array
263          * element, we're about to overwrite it with the last array element
264          * anyway.
265          */
266
267         if (mark_lock) {
268                 ret = 0;
269         } else {
270                 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
271                                                F_SETLKW, 0, 1);
272         }
273         tdb->num_locks--;
274
275         /*
276          * Shrink the array by overwriting the element just unlocked with the
277          * last array element.
278          */
279
280         if (tdb->num_lockrecs > 1) {
281                 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
282         }
283         tdb->num_lockrecs -= 1;
284
285         /*
286          * We don't bother with realloc when the array shrinks, but if we have
287          * a completely idle tdb we should get rid of the locked array.
288          */
289
290         if (tdb->num_lockrecs == 0) {
291                 SAFE_FREE(tdb->lockrecs);
292         }
293
294         if (ret)
295                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n")); 
296         return ret;
297 }
298
299 /*
300   get the transaction lock
301  */
302 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
303 {
304         if (tdb->have_transaction_lock || tdb->global_lock.count) {
305                 tdb->have_transaction_lock++;
306                 return 0;
307         }
308         if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype, 
309                                      F_SETLKW, 0, 1) == -1) {
310                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
311                 tdb->ecode = TDB_ERR_LOCK;
312                 return -1;
313         }
314         tdb->have_transaction_lock++;
315         return 0;
316 }
317
318 /*
319   release the transaction lock
320  */
321 int tdb_transaction_unlock(struct tdb_context *tdb)
322 {
323         if (--tdb->have_transaction_lock) {
324                 return 0;
325         }
326         return tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
327 }
328
329
330
331
332 /* lock/unlock entire database */
333 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
334 {
335         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
336
337         ltype &= ~TDB_MARK_LOCK;
338
339         /* There are no locks on read-only dbs */
340         if (tdb->read_only || tdb->traverse_read)
341                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
342
343         if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
344                 tdb->global_lock.count++;
345                 return 0;
346         }
347
348         if (tdb->global_lock.count) {
349                 /* a global lock of a different type exists */
350                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
351         }
352         
353         if (tdb->num_locks != 0) {
354                 /* can't combine global and chain locks */
355                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
356         }
357
358         if (!mark_lock &&
359             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
360                                      0, 4*tdb->header.hash_size)) {
361                 if (op == F_SETLKW) {
362                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
363                 }
364                 return -1;
365         }
366
367         tdb->global_lock.count = 1;
368         tdb->global_lock.ltype = ltype;
369
370         return 0;
371 }
372
373
374
375 /* unlock entire db */
376 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
377 {
378         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
379
380         ltype &= ~TDB_MARK_LOCK;
381
382         /* There are no locks on read-only dbs */
383         if (tdb->read_only || tdb->traverse_read) {
384                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
385         }
386
387         if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
388                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
389         }
390
391         if (tdb->global_lock.count > 1) {
392                 tdb->global_lock.count--;
393                 return 0;
394         }
395
396         if (!mark_lock &&
397             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 
398                                      0, 4*tdb->header.hash_size)) {
399                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
400                 return -1;
401         }
402
403         tdb->global_lock.count = 0;
404         tdb->global_lock.ltype = 0;
405
406         return 0;
407 }
408
409 /* lock entire database with write lock */
410 int tdb_lockall(struct tdb_context *tdb)
411 {
412         tdb_trace(tdb, "tdb_lockall");
413         return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
414 }
415
416 /* lock entire database with write lock - mark only */
417 int tdb_lockall_mark(struct tdb_context *tdb)
418 {
419         tdb_trace(tdb, "tdb_lockall_mark");
420         return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
421 }
422
423 /* unlock entire database with write lock - unmark only */
424 int tdb_lockall_unmark(struct tdb_context *tdb)
425 {
426         tdb_trace(tdb, "tdb_lockall_unmark");
427         return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
428 }
429
430 /* lock entire database with write lock - nonblocking varient */
431 int tdb_lockall_nonblock(struct tdb_context *tdb)
432 {
433         int ret = _tdb_lockall(tdb, F_WRLCK, F_SETLK);
434         tdb_trace_ret(tdb, "tdb_lockall_nonblock", ret);
435         return ret;
436 }
437
438 /* unlock entire database with write lock */
439 int tdb_unlockall(struct tdb_context *tdb)
440 {
441         tdb_trace(tdb, "tdb_unlockall");
442         return _tdb_unlockall(tdb, F_WRLCK);
443 }
444
445 /* lock entire database with read lock */
446 int tdb_lockall_read(struct tdb_context *tdb)
447 {
448         tdb_trace(tdb, "tdb_lockall_read");
449         return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
450 }
451
452 /* lock entire database with read lock - nonblock varient */
453 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
454 {
455         int ret = _tdb_lockall(tdb, F_RDLCK, F_SETLK);
456         tdb_trace_ret(tdb, "tdb_lockall_read_nonblock", ret);
457         return ret;
458 }
459
460 /* unlock entire database with read lock */
461 int tdb_unlockall_read(struct tdb_context *tdb)
462 {
463         tdb_trace(tdb, "tdb_unlockall_read");
464         return _tdb_unlockall(tdb, F_RDLCK);
465 }
466
467 /* lock/unlock one hash chain. This is meant to be used to reduce
468    contention - it cannot guarantee how many records will be locked */
469 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
470 {
471         int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
472         tdb_trace_1rec(tdb, "tdb_chainlock", key);
473         return ret;
474 }
475
476 /* lock/unlock one hash chain, non-blocking. This is meant to be used
477    to reduce contention - it cannot guarantee how many records will be
478    locked */
479 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
480 {
481         int ret = tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
482         tdb_trace_1rec_ret(tdb, "tdb_chainlock_nonblock", key, ret);
483         return ret;
484 }
485
486 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
487 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
488 {
489         int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
490         tdb_trace_1rec(tdb, "tdb_chainlock_mark", key);
491         return ret;
492 }
493
494 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
495 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
496 {
497         tdb_trace_1rec(tdb, "tdb_chainlock_unmark", key);
498         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
499 }
500
501 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
502 {
503         tdb_trace_1rec(tdb, "tdb_chainunlock", key);
504         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
505 }
506
507 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
508 {
509         int ret;
510         ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
511         tdb_trace_1rec(tdb, "tdb_chainlock_read", key);
512         return ret;
513 }
514
515 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
516 {
517         tdb_trace_1rec(tdb, "tdb_chainunlock_read", key);
518         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
519 }
520
521 /* record lock stops delete underneath */
522 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
523 {
524         if (tdb->global_lock.count) {
525                 return 0;
526         }
527         return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
528 }
529
530 /*
531   Write locks override our own fcntl readlocks, so check it here.
532   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
533   an error to fail to get the lock here.
534 */
535 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
536 {
537         struct tdb_traverse_lock *i;
538         for (i = &tdb->travlocks; i; i = i->next)
539                 if (i->off == off)
540                         return -1;
541         return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
542 }
543
544 /*
545   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
546   an error to fail to get the lock here.
547 */
548 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
549 {
550         return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
551 }
552
553 /* fcntl locks don't stack: avoid unlocking someone else's */
554 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
555 {
556         struct tdb_traverse_lock *i;
557         uint32_t count = 0;
558
559         if (tdb->global_lock.count) {
560                 return 0;
561         }
562
563         if (off == 0)
564                 return 0;
565         for (i = &tdb->travlocks; i; i = i->next)
566                 if (i->off == off)
567                         count++;
568         return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
569 }