tdb2: implement tdb_exists and tdb_parse_record
[ccan] / ccan / tdb2 / tdb.c
1 #include "private.h"
2 #include <ccan/asprintf/asprintf.h>
3 #include <stdarg.h>
4
5 /* The null return. */
6 struct tdb_data tdb_null = { .dptr = NULL, .dsize = 0 };
7
8 static enum TDB_ERROR update_rec_hdr(struct tdb_context *tdb,
9                                      tdb_off_t off,
10                                      tdb_len_t keylen,
11                                      tdb_len_t datalen,
12                                      struct tdb_used_record *rec,
13                                      uint64_t h)
14 {
15         uint64_t dataroom = rec_data_length(rec) + rec_extra_padding(rec);
16         enum TDB_ERROR ecode;
17
18         ecode = set_header(tdb, rec, TDB_USED_MAGIC, keylen, datalen,
19                            keylen + dataroom, h);
20         if (ecode == TDB_SUCCESS) {
21                 ecode = tdb_write_convert(tdb, off, rec, sizeof(*rec));
22         }
23         return ecode;
24 }
25
26 static enum TDB_ERROR replace_data(struct tdb_context *tdb,
27                                    struct hash_info *h,
28                                    struct tdb_data key, struct tdb_data dbuf,
29                                    tdb_off_t old_off, tdb_len_t old_room,
30                                    bool growing)
31 {
32         tdb_off_t new_off;
33         enum TDB_ERROR ecode;
34
35         /* Allocate a new record. */
36         new_off = alloc(tdb, key.dsize, dbuf.dsize, h->h, TDB_USED_MAGIC,
37                         growing);
38         if (TDB_OFF_IS_ERR(new_off)) {
39                 return new_off;
40         }
41
42         /* We didn't like the existing one: remove it. */
43         if (old_off) {
44                 add_stat(tdb, frees, 1);
45                 ecode = add_free_record(tdb, old_off,
46                                         sizeof(struct tdb_used_record)
47                                         + key.dsize + old_room);
48                 if (ecode == TDB_SUCCESS)
49                         ecode = replace_in_hash(tdb, h, new_off);
50         } else {
51                 ecode = add_to_hash(tdb, h, new_off);
52         }
53         if (ecode != TDB_SUCCESS) {
54                 return ecode;
55         }
56
57         new_off += sizeof(struct tdb_used_record);
58         ecode = tdb->methods->twrite(tdb, new_off, key.dptr, key.dsize);
59         if (ecode != TDB_SUCCESS) {
60                 return ecode;
61         }
62
63         new_off += key.dsize;
64         ecode = tdb->methods->twrite(tdb, new_off, dbuf.dptr, dbuf.dsize);
65         if (ecode != TDB_SUCCESS) {
66                 return ecode;
67         }
68
69         /* FIXME: tdb_increment_seqnum(tdb); */
70         return TDB_SUCCESS;
71 }
72
73 static enum TDB_ERROR update_data(struct tdb_context *tdb,
74                                   tdb_off_t off,
75                                   struct tdb_data dbuf,
76                                   tdb_len_t extra)
77 {
78         enum TDB_ERROR ecode;
79
80         ecode = tdb->methods->twrite(tdb, off, dbuf.dptr, dbuf.dsize);
81         if (ecode == TDB_SUCCESS && extra) {
82                 /* Put a zero in; future versions may append other data. */
83                 ecode = tdb->methods->twrite(tdb, off + dbuf.dsize, "", 1);
84         }
85         return ecode;
86 }
87
88 enum TDB_ERROR tdb_store(struct tdb_context *tdb,
89                          struct tdb_data key, struct tdb_data dbuf, int flag)
90 {
91         struct hash_info h;
92         tdb_off_t off;
93         tdb_len_t old_room = 0;
94         struct tdb_used_record rec;
95         enum TDB_ERROR ecode;
96
97         off = find_and_lock(tdb, key, F_WRLCK, &h, &rec, NULL);
98         if (TDB_OFF_IS_ERR(off)) {
99                 return off;
100         }
101
102         /* Now we have lock on this hash bucket. */
103         if (flag == TDB_INSERT) {
104                 if (off) {
105                         ecode = TDB_ERR_EXISTS;
106                         goto out;
107                 }
108         } else {
109                 if (off) {
110                         old_room = rec_data_length(&rec)
111                                 + rec_extra_padding(&rec);
112                         if (old_room >= dbuf.dsize) {
113                                 /* Can modify in-place.  Easy! */
114                                 ecode = update_rec_hdr(tdb, off,
115                                                        key.dsize, dbuf.dsize,
116                                                        &rec, h.h);
117                                 if (ecode != TDB_SUCCESS) {
118                                         goto out;
119                                 }
120                                 ecode = update_data(tdb,
121                                                     off + sizeof(rec)
122                                                     + key.dsize, dbuf,
123                                                     old_room - dbuf.dsize);
124                                 if (ecode != TDB_SUCCESS) {
125                                         goto out;
126                                 }
127                                 tdb_unlock_hashes(tdb, h.hlock_start,
128                                                   h.hlock_range, F_WRLCK);
129                                 return TDB_SUCCESS;
130                         }
131                 } else {
132                         if (flag == TDB_MODIFY) {
133                                 /* if the record doesn't exist and we
134                                    are in TDB_MODIFY mode then we should fail
135                                    the store */
136                                 ecode = TDB_ERR_NOEXIST;
137                                 goto out;
138                         }
139                 }
140         }
141
142         /* If we didn't use the old record, this implies we're growing. */
143         ecode = replace_data(tdb, &h, key, dbuf, off, old_room, off);
144 out:
145         tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK);
146         return ecode;
147 }
148
149 enum TDB_ERROR tdb_append(struct tdb_context *tdb,
150                           struct tdb_data key, struct tdb_data dbuf)
151 {
152         struct hash_info h;
153         tdb_off_t off;
154         struct tdb_used_record rec;
155         tdb_len_t old_room = 0, old_dlen;
156         unsigned char *newdata;
157         struct tdb_data new_dbuf;
158         enum TDB_ERROR ecode;
159
160         off = find_and_lock(tdb, key, F_WRLCK, &h, &rec, NULL);
161         if (TDB_OFF_IS_ERR(off)) {
162                 return off;
163         }
164
165         if (off) {
166                 old_dlen = rec_data_length(&rec);
167                 old_room = old_dlen + rec_extra_padding(&rec);
168
169                 /* Fast path: can append in place. */
170                 if (rec_extra_padding(&rec) >= dbuf.dsize) {
171                         ecode = update_rec_hdr(tdb, off, key.dsize,
172                                                old_dlen + dbuf.dsize, &rec,
173                                                h.h);
174                         if (ecode != TDB_SUCCESS) {
175                                 goto out;
176                         }
177
178                         off += sizeof(rec) + key.dsize + old_dlen;
179                         ecode = update_data(tdb, off, dbuf,
180                                             rec_extra_padding(&rec));
181                         goto out;
182                 }
183
184                 /* Slow path. */
185                 newdata = malloc(key.dsize + old_dlen + dbuf.dsize);
186                 if (!newdata) {
187                         ecode = tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
188                                            "tdb_append:"
189                                            " failed to allocate %zu bytes",
190                                            (size_t)(key.dsize + old_dlen
191                                                     + dbuf.dsize));
192                         goto out;
193                 }
194                 ecode = tdb->methods->tread(tdb, off + sizeof(rec) + key.dsize,
195                                             newdata, old_dlen);
196                 if (ecode != TDB_SUCCESS) {
197                         goto out_free_newdata;
198                 }
199                 memcpy(newdata + old_dlen, dbuf.dptr, dbuf.dsize);
200                 new_dbuf.dptr = newdata;
201                 new_dbuf.dsize = old_dlen + dbuf.dsize;
202         } else {
203                 newdata = NULL;
204                 new_dbuf = dbuf;
205         }
206
207         /* If they're using tdb_append(), it implies they're growing record. */
208         ecode = replace_data(tdb, &h, key, new_dbuf, off, old_room, true);
209
210 out_free_newdata:
211         free(newdata);
212 out:
213         tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK);
214         return ecode;
215 }
216
217 enum TDB_ERROR tdb_fetch(struct tdb_context *tdb, struct tdb_data key,
218                          struct tdb_data *data)
219 {
220         tdb_off_t off;
221         struct tdb_used_record rec;
222         struct hash_info h;
223         enum TDB_ERROR ecode;
224
225         off = find_and_lock(tdb, key, F_RDLCK, &h, &rec, NULL);
226         if (TDB_OFF_IS_ERR(off)) {
227                 return off;
228         }
229
230         if (!off) {
231                 ecode = TDB_ERR_NOEXIST;
232         } else {
233                 data->dsize = rec_data_length(&rec);
234                 data->dptr = tdb_alloc_read(tdb, off + sizeof(rec) + key.dsize,
235                                             data->dsize);
236                 if (TDB_PTR_IS_ERR(data->dptr)) {
237                         ecode = TDB_PTR_ERR(data->dptr);
238                 } else
239                         ecode = TDB_SUCCESS;
240         }
241
242         tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_RDLCK);
243         return ecode;
244 }
245
246 bool tdb_exists(struct tdb_context *tdb, TDB_DATA key)
247 {
248         tdb_off_t off;
249         struct tdb_used_record rec;
250         struct hash_info h;
251
252         off = find_and_lock(tdb, key, F_RDLCK, &h, &rec, NULL);
253         if (TDB_OFF_IS_ERR(off)) {
254                 return false;
255         }
256         tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_RDLCK);
257
258         return off ? true : false;
259 }
260
261 enum TDB_ERROR tdb_delete(struct tdb_context *tdb, struct tdb_data key)
262 {
263         tdb_off_t off;
264         struct tdb_used_record rec;
265         struct hash_info h;
266         enum TDB_ERROR ecode;
267
268         off = find_and_lock(tdb, key, F_WRLCK, &h, &rec, NULL);
269         if (TDB_OFF_IS_ERR(off)) {
270                 return off;
271         }
272
273         if (!off) {
274                 ecode = TDB_ERR_NOEXIST;
275                 goto unlock;
276         }
277
278         ecode = delete_from_hash(tdb, &h);
279         if (ecode != TDB_SUCCESS) {
280                 goto unlock;
281         }
282
283         /* Free the deleted entry. */
284         add_stat(tdb, frees, 1);
285         ecode = add_free_record(tdb, off,
286                                 sizeof(struct tdb_used_record)
287                                 + rec_key_length(&rec)
288                                 + rec_data_length(&rec)
289                                 + rec_extra_padding(&rec));
290
291 unlock:
292         tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_WRLCK);
293         return ecode;
294 }
295
296 unsigned int tdb_get_flags(struct tdb_context *tdb)
297 {
298         return tdb->flags;
299 }
300
301 void tdb_add_flag(struct tdb_context *tdb, unsigned flag)
302 {
303         if (tdb->flags & TDB_INTERNAL) {
304                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
305                            "tdb_add_flag: internal db");
306                 return;
307         }
308         switch (flag) {
309         case TDB_NOLOCK:
310                 tdb->flags |= TDB_NOLOCK;
311                 break;
312         case TDB_NOMMAP:
313                 tdb->flags |= TDB_NOMMAP;
314                 tdb_munmap(tdb->file);
315                 break;
316         case TDB_NOSYNC:
317                 tdb->flags |= TDB_NOSYNC;
318                 break;
319         default:
320                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
321                            "tdb_add_flag: Unknown flag %u", flag);
322         }
323 }
324
325 void tdb_remove_flag(struct tdb_context *tdb, unsigned flag)
326 {
327         if (tdb->flags & TDB_INTERNAL) {
328                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
329                            "tdb_remove_flag: internal db");
330                 return;
331         }
332         switch (flag) {
333         case TDB_NOLOCK:
334                 tdb->flags &= ~TDB_NOLOCK;
335                 break;
336         case TDB_NOMMAP:
337                 tdb->flags &= ~TDB_NOMMAP;
338                 tdb_mmap(tdb);
339                 break;
340         case TDB_NOSYNC:
341                 tdb->flags &= ~TDB_NOSYNC;
342                 break;
343         default:
344                 tdb_logerr(tdb, TDB_ERR_EINVAL, TDB_LOG_USE_ERROR,
345                            "tdb_remove_flag: Unknown flag %u", flag);
346         }
347 }
348
349 const char *tdb_errorstr(enum TDB_ERROR ecode)
350 {
351         /* Gcc warns if you miss a case in the switch, so use that. */
352         switch (ecode) {
353         case TDB_SUCCESS: return "Success";
354         case TDB_ERR_CORRUPT: return "Corrupt database";
355         case TDB_ERR_IO: return "IO Error";
356         case TDB_ERR_LOCK: return "Locking error";
357         case TDB_ERR_OOM: return "Out of memory";
358         case TDB_ERR_EXISTS: return "Record exists";
359         case TDB_ERR_EINVAL: return "Invalid parameter";
360         case TDB_ERR_NOEXIST: return "Record does not exist";
361         case TDB_ERR_RDONLY: return "write not permitted";
362         }
363         return "Invalid error code";
364 }
365
366 enum TDB_ERROR COLD tdb_logerr(struct tdb_context *tdb,
367                                enum TDB_ERROR ecode,
368                                enum tdb_log_level level,
369                                const char *fmt, ...)
370 {
371         char *message;
372         va_list ap;
373         size_t len;
374         /* tdb_open paths care about errno, so save it. */
375         int saved_errno = errno;
376
377         if (!tdb->logfn)
378                 return ecode;
379
380         va_start(ap, fmt);
381         len = vasprintf(&message, fmt, ap);
382         va_end(ap);
383
384         if (len < 0) {
385                 tdb->logfn(tdb, TDB_LOG_ERROR, tdb->log_private,
386                            "out of memory formatting message:");
387                 tdb->logfn(tdb, level, tdb->log_private, fmt);
388         } else {
389                 tdb->logfn(tdb, level, tdb->log_private, message);
390                 free(message);
391         }
392         errno = saved_errno;
393         return ecode;
394 }
395
396 enum TDB_ERROR tdb_parse_record_(struct tdb_context *tdb,
397                                  TDB_DATA key,
398                                  enum TDB_ERROR (*parse)(TDB_DATA key,
399                                                          TDB_DATA data,
400                                                          void *p),
401                                  void *p)
402 {
403         tdb_off_t off;
404         struct tdb_used_record rec;
405         struct hash_info h;
406         TDB_DATA data;
407         enum TDB_ERROR ecode;
408
409         off = find_and_lock(tdb, key, F_RDLCK, &h, &rec, NULL);
410         if (TDB_OFF_IS_ERR(off)) {
411                 return off;
412         }
413
414         if (!off) {
415                 ecode = TDB_ERR_NOEXIST;
416         } else {
417                 data.dsize = rec_data_length(&rec);
418                 data.dptr = (void *)tdb_access_read(tdb,
419                                                     off + sizeof(rec)
420                                                     + key.dsize,
421                                                     data.dsize, false);
422                 if (TDB_PTR_IS_ERR(data.dptr)) {
423                         ecode = TDB_PTR_ERR(data.dptr);
424                 } else {
425                         ecode = parse(key, data, p);
426                         tdb_access_release(tdb, data.dptr);
427                 }
428         }
429
430         tdb_unlock_hashes(tdb, h.hlock_start, h.hlock_range, F_RDLCK);
431         return ecode;
432 }
433