]> git.ozlabs.org Git - ccan/blob - ccan/tdb2/io.c
tdb2: use counters to decide when to coalesce records.
[ccan] / ccan / tdb2 / io.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    Copyright (C) Rusty Russell                     2010
10
11      ** NOTE! The following LGPL license applies to the tdb
12      ** library. This does NOT imply that all of Samba is released
13      ** under the LGPL
14
15    This library is free software; you can redistribute it and/or
16    modify it under the terms of the GNU Lesser General Public
17    License as published by the Free Software Foundation; either
18    version 3 of the License, or (at your option) any later version.
19
20    This library is distributed in the hope that it will be useful,
21    but WITHOUT ANY WARRANTY; without even the implied warranty of
22    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
23    Lesser General Public License for more details.
24
25    You should have received a copy of the GNU Lesser General Public
26    License along with this library; if not, see <http://www.gnu.org/licenses/>.
27 */
28 #include "private.h"
29 #include <assert.h>
30 #include <ccan/likely/likely.h>
31
32 void tdb_munmap(struct tdb_file *file)
33 {
34         if (file->fd == -1)
35                 return;
36
37         if (file->map_ptr) {
38                 munmap(file->map_ptr, file->map_size);
39                 file->map_ptr = NULL;
40         }
41 }
42
43 void tdb_mmap(struct tdb_context *tdb)
44 {
45         if (tdb->flags & TDB_INTERNAL)
46                 return;
47
48         if (tdb->flags & TDB_NOMMAP)
49                 return;
50
51         /* size_t can be smaller than off_t. */
52         if ((size_t)tdb->file->map_size == tdb->file->map_size) {
53                 tdb->file->map_ptr = mmap(NULL, tdb->file->map_size,
54                                           tdb->mmap_flags,
55                                           MAP_SHARED, tdb->file->fd, 0);
56         } else
57                 tdb->file->map_ptr = MAP_FAILED;
58
59         /*
60          * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
61          */
62         if (tdb->file->map_ptr == MAP_FAILED) {
63                 tdb->file->map_ptr = NULL;
64                 tdb_logerr(tdb, TDB_SUCCESS, TDB_LOG_WARNING,
65                            "tdb_mmap failed for size %lld (%s)",
66                            (long long)tdb->file->map_size, strerror(errno));
67         }
68 }
69
70 /* check for an out of bounds access - if it is out of bounds then
71    see if the database has been expanded by someone else and expand
72    if necessary
73    note that "len" is the minimum length needed for the db
74 */
75 static enum TDB_ERROR tdb_oob(struct tdb_context *tdb, tdb_off_t len,
76                               bool probe)
77 {
78         struct stat st;
79         enum TDB_ERROR ecode;
80
81         /* We can't hold pointers during this: we could unmap! */
82         assert(!tdb->direct_access
83                || (tdb->flags & TDB_NOLOCK)
84                || tdb_has_expansion_lock(tdb));
85
86         if (len <= tdb->file->map_size)
87                 return 0;
88         if (tdb->flags & TDB_INTERNAL) {
89                 if (!probe) {
90                         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
91                                  "tdb_oob len %lld beyond internal"
92                                  " malloc size %lld",
93                                  (long long)len,
94                                  (long long)tdb->file->map_size);
95                 }
96                 return TDB_ERR_IO;
97         }
98
99         ecode = tdb_lock_expand(tdb, F_RDLCK);
100         if (ecode != TDB_SUCCESS) {
101                 return ecode;
102         }
103
104         if (fstat(tdb->file->fd, &st) != 0) {
105                 tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
106                            "Failed to fstat file: %s", strerror(errno));
107                 tdb_unlock_expand(tdb, F_RDLCK);
108                 return TDB_ERR_IO;
109         }
110
111         tdb_unlock_expand(tdb, F_RDLCK);
112
113         if (st.st_size < (size_t)len) {
114                 if (!probe) {
115                         tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
116                                    "tdb_oob len %zu beyond eof at %zu",
117                                    (size_t)len, st.st_size);
118                 }
119                 return TDB_ERR_IO;
120         }
121
122         /* Unmap, update size, remap */
123         tdb_munmap(tdb->file);
124
125         tdb->file->map_size = st.st_size;
126         tdb_mmap(tdb);
127         return TDB_SUCCESS;
128 }
129
130 /* Endian conversion: we only ever deal with 8 byte quantities */
131 void *tdb_convert(const struct tdb_context *tdb, void *buf, tdb_len_t size)
132 {
133         if (unlikely((tdb->flags & TDB_CONVERT)) && buf) {
134                 uint64_t i, *p = (uint64_t *)buf;
135                 for (i = 0; i < size / 8; i++)
136                         p[i] = bswap_64(p[i]);
137         }
138         return buf;
139 }
140
141 /* Return first non-zero offset in offset array, or end, or -ve error. */
142 /* FIXME: Return the off? */
143 uint64_t tdb_find_nonzero_off(struct tdb_context *tdb,
144                               tdb_off_t base, uint64_t start, uint64_t end)
145 {
146         uint64_t i;
147         const uint64_t *val;
148
149         /* Zero vs non-zero is the same unconverted: minor optimization. */
150         val = tdb_access_read(tdb, base + start * sizeof(tdb_off_t),
151                               (end - start) * sizeof(tdb_off_t), false);
152         if (TDB_PTR_IS_ERR(val)) {
153                 return TDB_PTR_ERR(val);
154         }
155
156         for (i = 0; i < (end - start); i++) {
157                 if (val[i])
158                         break;
159         }
160         tdb_access_release(tdb, val);
161         return start + i;
162 }
163
164 /* Return first zero offset in num offset array, or num, or -ve error. */
165 uint64_t tdb_find_zero_off(struct tdb_context *tdb, tdb_off_t off,
166                            uint64_t num)
167 {
168         uint64_t i;
169         const uint64_t *val;
170
171         /* Zero vs non-zero is the same unconverted: minor optimization. */
172         val = tdb_access_read(tdb, off, num * sizeof(tdb_off_t), false);
173         if (TDB_PTR_IS_ERR(val)) {
174                 return TDB_PTR_ERR(val);
175         }
176
177         for (i = 0; i < num; i++) {
178                 if (!val[i])
179                         break;
180         }
181         tdb_access_release(tdb, val);
182         return i;
183 }
184
185 enum TDB_ERROR zero_out(struct tdb_context *tdb, tdb_off_t off, tdb_len_t len)
186 {
187         char buf[8192] = { 0 };
188         void *p = tdb->methods->direct(tdb, off, len, true);
189         enum TDB_ERROR ecode = TDB_SUCCESS;
190
191         assert(!tdb->read_only);
192         if (TDB_PTR_IS_ERR(p)) {
193                 return TDB_PTR_ERR(p);
194         }
195         if (p) {
196                 memset(p, 0, len);
197                 return ecode;
198         }
199         while (len) {
200                 unsigned todo = len < sizeof(buf) ? len : sizeof(buf);
201                 ecode = tdb->methods->twrite(tdb, off, buf, todo);
202                 if (ecode != TDB_SUCCESS) {
203                         break;
204                 }
205                 len -= todo;
206                 off += todo;
207         }
208         return ecode;
209 }
210
211 tdb_off_t tdb_read_off(struct tdb_context *tdb, tdb_off_t off)
212 {
213         tdb_off_t ret;
214         enum TDB_ERROR ecode;
215
216         if (likely(!(tdb->flags & TDB_CONVERT))) {
217                 tdb_off_t *p = tdb->methods->direct(tdb, off, sizeof(*p),
218                                                     false);
219                 if (TDB_PTR_IS_ERR(p)) {
220                         return TDB_PTR_ERR(p);
221                 }
222                 if (p)
223                         return *p;
224         }
225
226         ecode = tdb_read_convert(tdb, off, &ret, sizeof(ret));
227         if (ecode != TDB_SUCCESS) {
228                 return ecode;
229         }
230         return ret;
231 }
232
233 /* write a lump of data at a specified offset */
234 static enum TDB_ERROR tdb_write(struct tdb_context *tdb, tdb_off_t off,
235                                 const void *buf, tdb_len_t len)
236 {
237         enum TDB_ERROR ecode;
238
239         if (tdb->read_only) {
240                 return tdb_logerr(tdb, TDB_ERR_RDONLY, TDB_LOG_USE_ERROR,
241                                   "Write to read-only database");
242         }
243
244         ecode = tdb->methods->oob(tdb, off + len, 0);
245         if (ecode != TDB_SUCCESS) {
246                 return ecode;
247         }
248
249         if (tdb->file->map_ptr) {
250                 memcpy(off + (char *)tdb->file->map_ptr, buf, len);
251         } else {
252                 ssize_t ret;
253                 ret = pwrite(tdb->file->fd, buf, len, off);
254                 if (ret != len) {
255                         /* This shouldn't happen: we avoid sparse files. */
256                         if (ret >= 0)
257                                 errno = ENOSPC;
258
259                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
260                                           "tdb_write: %zi at %zu len=%zu (%s)",
261                                           ret, (size_t)off, (size_t)len,
262                                           strerror(errno));
263                 }
264         }
265         return TDB_SUCCESS;
266 }
267
268 /* read a lump of data at a specified offset */
269 static enum TDB_ERROR tdb_read(struct tdb_context *tdb, tdb_off_t off,
270                                void *buf, tdb_len_t len)
271 {
272         enum TDB_ERROR ecode;
273
274         ecode = tdb->methods->oob(tdb, off + len, 0);
275         if (ecode != TDB_SUCCESS) {
276                 return ecode;
277         }
278
279         if (tdb->file->map_ptr) {
280                 memcpy(buf, off + (char *)tdb->file->map_ptr, len);
281         } else {
282                 ssize_t r = pread(tdb->file->fd, buf, len, off);
283                 if (r != len) {
284                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
285                                           "tdb_read failed with %zi at %zu "
286                                           "len=%zu (%s) map_size=%zu",
287                                           r, (size_t)off, (size_t)len,
288                                           strerror(errno),
289                                           (size_t)tdb->file->map_size);
290                 }
291         }
292         return TDB_SUCCESS;
293 }
294
295 enum TDB_ERROR tdb_write_convert(struct tdb_context *tdb, tdb_off_t off,
296                                  const void *rec, size_t len)
297 {
298         enum TDB_ERROR ecode;
299
300         if (unlikely((tdb->flags & TDB_CONVERT))) {
301                 void *conv = malloc(len);
302                 if (!conv) {
303                         return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
304                                           "tdb_write: no memory converting"
305                                           " %zu bytes", len);
306                 }
307                 memcpy(conv, rec, len);
308                 ecode = tdb->methods->twrite(tdb, off,
309                                            tdb_convert(tdb, conv, len), len);
310                 free(conv);
311         } else {
312                 ecode = tdb->methods->twrite(tdb, off, rec, len);
313         }
314         return ecode;
315 }
316
317 enum TDB_ERROR tdb_read_convert(struct tdb_context *tdb, tdb_off_t off,
318                                 void *rec, size_t len)
319 {
320         enum TDB_ERROR ecode = tdb->methods->tread(tdb, off, rec, len);
321         tdb_convert(tdb, rec, len);
322         return ecode;
323 }
324
325 enum TDB_ERROR tdb_write_off(struct tdb_context *tdb,
326                              tdb_off_t off, tdb_off_t val)
327 {
328         if (tdb->read_only) {
329                 return tdb_logerr(tdb, TDB_ERR_RDONLY, TDB_LOG_USE_ERROR,
330                                   "Write to read-only database");
331         }
332
333         if (likely(!(tdb->flags & TDB_CONVERT))) {
334                 tdb_off_t *p = tdb->methods->direct(tdb, off, sizeof(*p),
335                                                     true);
336                 if (TDB_PTR_IS_ERR(p)) {
337                         return TDB_PTR_ERR(p);
338                 }
339                 if (p) {
340                         *p = val;
341                         return TDB_SUCCESS;
342                 }
343         }
344         return tdb_write_convert(tdb, off, &val, sizeof(val));
345 }
346
347 static void *_tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset,
348                              tdb_len_t len, unsigned int prefix)
349 {
350         unsigned char *buf;
351         enum TDB_ERROR ecode;
352
353         /* some systems don't like zero length malloc */
354         buf = malloc(prefix + len ? prefix + len : 1);
355         if (!buf) {
356                 tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_USE_ERROR,
357                            "tdb_alloc_read malloc failed len=%zu",
358                            (size_t)(prefix + len));
359                 return TDB_ERR_PTR(TDB_ERR_OOM);
360         } else {
361                 ecode = tdb->methods->tread(tdb, offset, buf+prefix, len);
362                 if (unlikely(ecode != TDB_SUCCESS)) {
363                         free(buf);
364                         return TDB_ERR_PTR(ecode);
365                 }
366         }
367         return buf;
368 }
369
370 /* read a lump of data, allocating the space for it */
371 void *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
372 {
373         return _tdb_alloc_read(tdb, offset, len, 0);
374 }
375
376 static enum TDB_ERROR fill(struct tdb_context *tdb,
377                            const void *buf, size_t size,
378                            tdb_off_t off, tdb_len_t len)
379 {
380         while (len) {
381                 size_t n = len > size ? size : len;
382                 ssize_t ret = pwrite(tdb->file->fd, buf, n, off);
383                 if (ret != n) {
384                         if (ret >= 0)
385                                 errno = ENOSPC;
386
387                         return tdb_logerr(tdb, TDB_ERR_IO, TDB_LOG_ERROR,
388                                           "fill failed:"
389                                           " %zi at %zu len=%zu (%s)",
390                                           ret, (size_t)off, (size_t)len,
391                                           strerror(errno));
392                 }
393                 len -= n;
394                 off += n;
395         }
396         return TDB_SUCCESS;
397 }
398
399 /* expand a file.  we prefer to use ftruncate, as that is what posix
400   says to use for mmap expansion */
401 static enum TDB_ERROR tdb_expand_file(struct tdb_context *tdb,
402                                       tdb_len_t addition)
403 {
404         char buf[8192];
405         enum TDB_ERROR ecode;
406
407         if (tdb->read_only) {
408                 return tdb_logerr(tdb, TDB_ERR_RDONLY, TDB_LOG_USE_ERROR,
409                                   "Expand on read-only database");
410         }
411
412         if (tdb->flags & TDB_INTERNAL) {
413                 char *new = realloc(tdb->file->map_ptr,
414                                     tdb->file->map_size + addition);
415                 if (!new) {
416                         return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
417                                           "No memory to expand database");
418                 }
419                 tdb->file->map_ptr = new;
420                 tdb->file->map_size += addition;
421         } else {
422                 /* Unmap before trying to write; old TDB claimed OpenBSD had
423                  * problem with this otherwise. */
424                 tdb_munmap(tdb->file);
425
426                 /* If this fails, we try to fill anyway. */
427                 if (ftruncate(tdb->file->fd, tdb->file->map_size + addition))
428                         ;
429
430                 /* now fill the file with something. This ensures that the
431                    file isn't sparse, which would be very bad if we ran out of
432                    disk. This must be done with write, not via mmap */
433                 memset(buf, 0x43, sizeof(buf));
434                 ecode = fill(tdb, buf, sizeof(buf), tdb->file->map_size,
435                              addition);
436                 if (ecode != TDB_SUCCESS)
437                         return ecode;
438                 tdb->file->map_size += addition;
439                 tdb_mmap(tdb);
440         }
441         return TDB_SUCCESS;
442 }
443
444 const void *tdb_access_read(struct tdb_context *tdb,
445                             tdb_off_t off, tdb_len_t len, bool convert)
446 {
447         void *ret = NULL;
448
449         if (likely(!(tdb->flags & TDB_CONVERT))) {
450                 ret = tdb->methods->direct(tdb, off, len, false);
451
452                 if (TDB_PTR_IS_ERR(ret)) {
453                         return ret;
454                 }
455         }
456         if (!ret) {
457                 struct tdb_access_hdr *hdr;
458                 hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr));
459                 if (TDB_PTR_IS_ERR(hdr)) {
460                         return hdr;
461                 }
462                 hdr->next = tdb->access;
463                 tdb->access = hdr;
464                 ret = hdr + 1;
465                 if (convert) {
466                         tdb_convert(tdb, (void *)ret, len);
467                 }
468         } else
469                 tdb->direct_access++;
470
471         return ret;
472 }
473
474 void *tdb_access_write(struct tdb_context *tdb,
475                        tdb_off_t off, tdb_len_t len, bool convert)
476 {
477         void *ret = NULL;
478
479         if (tdb->read_only) {
480                 tdb_logerr(tdb, TDB_ERR_RDONLY, TDB_LOG_USE_ERROR,
481                            "Write to read-only database");
482                 return TDB_ERR_PTR(TDB_ERR_RDONLY);
483         }
484
485         if (likely(!(tdb->flags & TDB_CONVERT))) {
486                 ret = tdb->methods->direct(tdb, off, len, true);
487
488                 if (TDB_PTR_IS_ERR(ret)) {
489                         return ret;
490                 }
491         }
492
493         if (!ret) {
494                 struct tdb_access_hdr *hdr;
495                 hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr));
496                 if (TDB_PTR_IS_ERR(hdr)) {
497                         return hdr;
498                 }
499                 hdr->next = tdb->access;
500                 tdb->access = hdr;
501                 hdr->off = off;
502                 hdr->len = len;
503                 hdr->convert = convert;
504                 ret = hdr + 1;
505                 if (convert)
506                         tdb_convert(tdb, (void *)ret, len);
507         } else
508                 tdb->direct_access++;
509
510         return ret;
511 }
512
513 static struct tdb_access_hdr **find_hdr(struct tdb_context *tdb, const void *p)
514 {
515         struct tdb_access_hdr **hp;
516
517         for (hp = &tdb->access; *hp; hp = &(*hp)->next) {
518                 if (*hp + 1 == p)
519                         return hp;
520         }
521         return NULL;
522 }
523
524 void tdb_access_release(struct tdb_context *tdb, const void *p)
525 {
526         struct tdb_access_hdr *hdr, **hp = find_hdr(tdb, p);
527
528         if (hp) {
529                 hdr = *hp;
530                 *hp = hdr->next;
531                 free(hdr);
532         } else
533                 tdb->direct_access--;
534 }
535
536 enum TDB_ERROR tdb_access_commit(struct tdb_context *tdb, void *p)
537 {
538         struct tdb_access_hdr *hdr, **hp = find_hdr(tdb, p);
539         enum TDB_ERROR ecode;
540
541         if (hp) {
542                 hdr = *hp;
543                 if (hdr->convert)
544                         ecode = tdb_write_convert(tdb, hdr->off, p, hdr->len);
545                 else
546                         ecode = tdb_write(tdb, hdr->off, p, hdr->len);
547                 *hp = hdr->next;
548                 free(hdr);
549         } else {
550                 tdb->direct_access--;
551                 ecode = TDB_SUCCESS;
552         }
553
554         return ecode;
555 }
556
557 static void *tdb_direct(struct tdb_context *tdb, tdb_off_t off, size_t len,
558                         bool write_mode)
559 {
560         enum TDB_ERROR ecode;
561
562         if (unlikely(!tdb->file->map_ptr))
563                 return NULL;
564
565         ecode = tdb_oob(tdb, off + len, true);
566         if (unlikely(ecode != TDB_SUCCESS))
567                 return TDB_ERR_PTR(ecode);
568         return (char *)tdb->file->map_ptr + off;
569 }
570
571 void tdb_inc_seqnum(struct tdb_context *tdb)
572 {
573         tdb_off_t seq;
574
575         if (likely(!(tdb->flags & TDB_CONVERT))) {
576                 int64_t *direct;
577
578                 direct = tdb->methods->direct(tdb,
579                                               offsetof(struct tdb_header,
580                                                        seqnum),
581                                               sizeof(*direct), true);
582                 if (likely(direct)) {
583                         /* Don't let it go negative, even briefly */
584                         if (unlikely((*direct) + 1) < 0)
585                                 *direct = 0;
586                         (*direct)++;
587                         return;
588                 }
589         }
590
591         seq = tdb_read_off(tdb, offsetof(struct tdb_header, seqnum));
592         if (!TDB_OFF_IS_ERR(seq)) {
593                 seq++;
594                 if (unlikely((int64_t)seq < 0))
595                         seq = 0;
596                 tdb_write_off(tdb, offsetof(struct tdb_header, seqnum), seq);
597         }
598 }
599
600 static const struct tdb_methods io_methods = {
601         tdb_read,
602         tdb_write,
603         tdb_oob,
604         tdb_expand_file,
605         tdb_direct,
606 };
607
608 /*
609   initialise the default methods table
610 */
611 void tdb_io_init(struct tdb_context *tdb)
612 {
613         tdb->methods = &io_methods;
614 }