]> git.ozlabs.org Git - ccan/blob - ccan/tdb/lock.c
Tracing for tdb operations.
[ccan] / ccan / tdb / lock.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 #define TDB_MARK_LOCK 0x80000000
31
32 void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr)
33 {
34         tdb->interrupt_sig_ptr = ptr;
35 }
36
37 /* a byte range locking function - return 0 on success
38    this functions locks/unlocks 1 byte at the specified offset.
39
40    On error, errno is also set so that errors are passed back properly
41    through tdb_open(). 
42
43    note that a len of zero means lock to end of file
44 */
45 int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, 
46                int rw_type, int lck_type, int probe, size_t len)
47 {
48         struct flock fl;
49         int ret;
50
51         if (tdb->flags & TDB_NOLOCK) {
52                 return 0;
53         }
54
55         if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
56                 tdb->ecode = TDB_ERR_RDONLY;
57                 return -1;
58         }
59
60         fl.l_type = rw_type;
61         fl.l_whence = SEEK_SET;
62         fl.l_start = offset;
63         fl.l_len = len;
64         fl.l_pid = 0;
65
66         do {
67                 ret = fcntl(tdb->fd,lck_type,&fl);
68
69                 /* Check for a sigalarm break. */
70                 if (ret == -1 && errno == EINTR &&
71                                 tdb->interrupt_sig_ptr &&
72                                 *tdb->interrupt_sig_ptr) {
73                         break;
74                 }
75         } while (ret == -1 && errno == EINTR);
76
77         if (ret == -1) {
78                 /* Generic lock error. errno set by fcntl.
79                  * EAGAIN is an expected return from non-blocking
80                  * locks. */
81                 if (!probe && lck_type != F_SETLK) {
82                         /* Ensure error code is set for log fun to examine. */
83                         tdb->ecode = TDB_ERR_LOCK;
84                         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n", 
85                                  tdb->fd, offset, rw_type, lck_type, (int)len));
86                 }
87                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
88         }
89         return 0;
90 }
91
92
93 /*
94   upgrade a read lock to a write lock. This needs to be handled in a
95   special way as some OSes (such as solaris) have too conservative
96   deadlock detection and claim a deadlock when progress can be
97   made. For those OSes we may loop for a while.  
98 */
99 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
100 {
101         int count = 1000;
102         while (count--) {
103                 struct timeval tv;
104                 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
105                         return 0;
106                 }
107                 if (errno != EDEADLK) {
108                         break;
109                 }
110                 /* sleep for as short a time as we can - more portable than usleep() */
111                 tv.tv_sec = 0;
112                 tv.tv_usec = 1;
113                 select(0, NULL, NULL, NULL, &tv);
114         }
115         TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
116         return -1;
117 }
118
119
120 /* lock a list in the database. list -1 is the alloc list */
121 static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
122 {
123         struct tdb_lock_type *new_lck;
124         int i;
125         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
126
127         ltype &= ~TDB_MARK_LOCK;
128
129         /* a global lock allows us to avoid per chain locks */
130         if (tdb->global_lock.count && 
131             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
132                 return 0;
133         }
134
135         if (tdb->global_lock.count) {
136                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
137         }
138
139         if (list < -1 || list >= (int)tdb->header.hash_size) {
140                 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n", 
141                            list, ltype));
142                 return -1;
143         }
144         if (tdb->flags & TDB_NOLOCK)
145                 return 0;
146
147         for (i=0; i<tdb->num_lockrecs; i++) {
148                 if (tdb->lockrecs[i].list == list) {
149                         if (tdb->lockrecs[i].count == 0) {
150                                 /*
151                                  * Can't happen, see tdb_unlock(). It should
152                                  * be an assert.
153                                  */
154                                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
155                                          "lck->count == 0 for list %d", list));
156                         }
157                         /*
158                          * Just increment the in-memory struct, posix locks
159                          * don't stack.
160                          */
161                         tdb->lockrecs[i].count++;
162                         return 0;
163                 }
164         }
165
166         new_lck = (struct tdb_lock_type *)realloc(
167                 tdb->lockrecs,
168                 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
169         if (new_lck == NULL) {
170                 errno = ENOMEM;
171                 return -1;
172         }
173         tdb->lockrecs = new_lck;
174
175         /* Since fcntl locks don't nest, we do a lock for the first one,
176            and simply bump the count for future ones */
177         if (!mark_lock &&
178             tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
179                                      0, 1)) {
180                 return -1;
181         }
182
183         tdb->num_locks++;
184
185         tdb->lockrecs[tdb->num_lockrecs].list = list;
186         tdb->lockrecs[tdb->num_lockrecs].count = 1;
187         tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
188         tdb->num_lockrecs += 1;
189
190         return 0;
191 }
192
193 /* lock a list in the database. list -1 is the alloc list */
194 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
195 {
196         int ret;
197         ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
198         if (ret) {
199                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
200                          "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
201         }
202         return ret;
203 }
204
205 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
206 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
207 {
208         return _tdb_lock(tdb, list, ltype, F_SETLK);
209 }
210
211
212 /* unlock the database: returns void because it's too late for errors. */
213         /* changed to return int it may be interesting to know there
214            has been an error  --simo */
215 int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
216 {
217         int ret = -1;
218         int i;
219         struct tdb_lock_type *lck = NULL;
220         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
221
222         ltype &= ~TDB_MARK_LOCK;
223
224         /* a global lock allows us to avoid per chain locks */
225         if (tdb->global_lock.count && 
226             (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
227                 return 0;
228         }
229
230         if (tdb->global_lock.count) {
231                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
232         }
233
234         if (tdb->flags & TDB_NOLOCK)
235                 return 0;
236
237         /* Sanity checks */
238         if (list < -1 || list >= (int)tdb->header.hash_size) {
239                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
240                 return ret;
241         }
242
243         for (i=0; i<tdb->num_lockrecs; i++) {
244                 if (tdb->lockrecs[i].list == list) {
245                         lck = &tdb->lockrecs[i];
246                         break;
247                 }
248         }
249
250         if ((lck == NULL) || (lck->count == 0)) {
251                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
252                 return -1;
253         }
254
255         if (lck->count > 1) {
256                 lck->count--;
257                 return 0;
258         }
259
260         /*
261          * This lock has count==1 left, so we need to unlock it in the
262          * kernel. We don't bother with decrementing the in-memory array
263          * element, we're about to overwrite it with the last array element
264          * anyway.
265          */
266
267         if (mark_lock) {
268                 ret = 0;
269         } else {
270                 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
271                                                F_SETLKW, 0, 1);
272         }
273         tdb->num_locks--;
274
275         /*
276          * Shrink the array by overwriting the element just unlocked with the
277          * last array element.
278          */
279
280         if (tdb->num_lockrecs > 1) {
281                 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
282         }
283         tdb->num_lockrecs -= 1;
284
285         /*
286          * We don't bother with realloc when the array shrinks, but if we have
287          * a completely idle tdb we should get rid of the locked array.
288          */
289
290         if (tdb->num_lockrecs == 0) {
291                 SAFE_FREE(tdb->lockrecs);
292         }
293
294         if (ret)
295                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n")); 
296         return ret;
297 }
298
299 /*
300   get the transaction lock
301  */
302 int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
303 {
304         if (tdb->have_transaction_lock || tdb->global_lock.count) {
305                 return 0;
306         }
307         if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype, 
308                                      F_SETLKW, 0, 1) == -1) {
309                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
310                 tdb->ecode = TDB_ERR_LOCK;
311                 return -1;
312         }
313         tdb->have_transaction_lock = 1;
314         return 0;
315 }
316
317 /*
318   release the transaction lock
319  */
320 int tdb_transaction_unlock(struct tdb_context *tdb)
321 {
322         int ret;
323         if (!tdb->have_transaction_lock) {
324                 return 0;
325         }
326         ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
327         if (ret == 0) {
328                 tdb->have_transaction_lock = 0;
329         }
330         return ret;
331 }
332
333
334
335
336 /* lock/unlock entire database */
337 static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
338 {
339         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
340
341         ltype &= ~TDB_MARK_LOCK;
342
343         /* There are no locks on read-only dbs */
344         if (tdb->read_only || tdb->traverse_read)
345                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
346
347         if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
348                 tdb->global_lock.count++;
349                 return 0;
350         }
351
352         if (tdb->global_lock.count) {
353                 /* a global lock of a different type exists */
354                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
355         }
356         
357         if (tdb->num_locks != 0) {
358                 /* can't combine global and chain locks */
359                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
360         }
361
362         if (!mark_lock &&
363             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
364                                      0, 4*tdb->header.hash_size)) {
365                 if (op == F_SETLKW) {
366                         TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
367                 }
368                 return -1;
369         }
370
371         tdb->global_lock.count = 1;
372         tdb->global_lock.ltype = ltype;
373
374         return 0;
375 }
376
377
378
379 /* unlock entire db */
380 static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
381 {
382         bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
383
384         ltype &= ~TDB_MARK_LOCK;
385
386         /* There are no locks on read-only dbs */
387         if (tdb->read_only || tdb->traverse_read) {
388                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
389         }
390
391         if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
392                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
393         }
394
395         if (tdb->global_lock.count > 1) {
396                 tdb->global_lock.count--;
397                 return 0;
398         }
399
400         if (!mark_lock &&
401             tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 
402                                      0, 4*tdb->header.hash_size)) {
403                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
404                 return -1;
405         }
406
407         tdb->global_lock.count = 0;
408         tdb->global_lock.ltype = 0;
409
410         return 0;
411 }
412
413 /* lock entire database with write lock */
414 int tdb_lockall(struct tdb_context *tdb)
415 {
416         tdb_trace(tdb, "tdb_lockall\n");
417         return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
418 }
419
420 /* lock entire database with write lock - mark only */
421 int tdb_lockall_mark(struct tdb_context *tdb)
422 {
423         tdb_trace(tdb, "tdb_lockall_mark\n");
424         return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
425 }
426
427 /* unlock entire database with write lock - unmark only */
428 int tdb_lockall_unmark(struct tdb_context *tdb)
429 {
430         tdb_trace(tdb, "tdb_lockall_unmark\n");
431         return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
432 }
433
434 /* lock entire database with write lock - nonblocking varient */
435 int tdb_lockall_nonblock(struct tdb_context *tdb)
436 {
437         int ret = _tdb_lockall(tdb, F_WRLCK, F_SETLK);
438         tdb_trace(tdb, "tdb_lockall_nonblock = %i\n", ret);
439         return ret;
440 }
441
442 /* unlock entire database with write lock */
443 int tdb_unlockall(struct tdb_context *tdb)
444 {
445         tdb_trace(tdb, "tdb_unlockall\n");
446         return _tdb_unlockall(tdb, F_WRLCK);
447 }
448
449 /* lock entire database with read lock */
450 int tdb_lockall_read(struct tdb_context *tdb)
451 {
452         tdb_trace(tdb, "tdb_lockall_read\n");
453         return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
454 }
455
456 /* lock entire database with read lock - nonblock varient */
457 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
458 {
459         int ret = _tdb_lockall(tdb, F_RDLCK, F_SETLK);
460         tdb_trace(tdb, "tdb_lockall_read_nonblock = %i\n", ret);
461         return ret;
462 }
463
464 /* unlock entire database with read lock */
465 int tdb_unlockall_read(struct tdb_context *tdb)
466 {
467         tdb_trace(tdb, "tdb_unlockall_read\n");
468         return _tdb_unlockall(tdb, F_RDLCK);
469 }
470
471 /* lock/unlock one hash chain. This is meant to be used to reduce
472    contention - it cannot guarantee how many records will be locked */
473 int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
474 {
475         tdb_trace(tdb, "tdb_chainlock ");
476         tdb_trace_record(tdb, key);
477         tdb_trace(tdb, "\n");
478         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
479 }
480
481 /* lock/unlock one hash chain, non-blocking. This is meant to be used
482    to reduce contention - it cannot guarantee how many records will be
483    locked */
484 int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
485 {
486         int ret = tdb_lock_nonblock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
487         tdb_trace(tdb, "tdb_chainlock_nonblock ");
488         tdb_trace_record(tdb, key);
489         tdb_trace(tdb, "= %i\n", ret);
490         return ret;
491 }
492
493 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
494 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
495 {
496         tdb_trace(tdb, "tdb_chainlock_mark ");
497         tdb_trace_record(tdb, key);
498         tdb_trace(tdb, "\n");
499         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
500 }
501
502 /* unmark a chain as locked without actually locking it. Warning! use with great caution! */
503 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
504 {
505         tdb_trace(tdb, "tdb_chainlock_unmark ");
506         tdb_trace_record(tdb, key);
507         tdb_trace(tdb, "\n");
508         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
509 }
510
511 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
512 {
513         tdb_trace(tdb, "tdb_chainunlock ");
514         tdb_trace_record(tdb, key);
515         tdb_trace(tdb, "\n");
516         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
517 }
518
519 int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
520 {
521         tdb_trace(tdb, "tdb_chainlock_read ");
522         tdb_trace_record(tdb, key);
523         tdb_trace(tdb, "\n");
524         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
525 }
526
527 int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
528 {
529         tdb_trace(tdb, "tdb_chainunlock_read ");
530         tdb_trace_record(tdb, key);
531         tdb_trace(tdb, "\n");
532         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
533 }
534
535
536
537 /* record lock stops delete underneath */
538 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
539 {
540         if (tdb->global_lock.count) {
541                 return 0;
542         }
543         return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
544 }
545
546 /*
547   Write locks override our own fcntl readlocks, so check it here.
548   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
549   an error to fail to get the lock here.
550 */
551 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
552 {
553         struct tdb_traverse_lock *i;
554         for (i = &tdb->travlocks; i; i = i->next)
555                 if (i->off == off)
556                         return -1;
557         return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
558 }
559
560 /*
561   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
562   an error to fail to get the lock here.
563 */
564 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
565 {
566         return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
567 }
568
569 /* fcntl locks don't stack: avoid unlocking someone else's */
570 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
571 {
572         struct tdb_traverse_lock *i;
573         uint32_t count = 0;
574
575         if (tdb->global_lock.count) {
576                 return 0;
577         }
578
579         if (off == 0)
580                 return 0;
581         for (i = &tdb->travlocks; i; i = i->next)
582                 if (i->off == off)
583                         count++;
584         return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
585 }