• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4309615408

pending completion
4309615408

push

github

GitHub
chore: Remove an unused `Arc<RwLock>` (#1106)

6 of 6 new or added lines in 2 files covered. (100.0%)

28466 of 40109 relevant lines covered (70.97%)

50957.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.94
/dozer-cache/src/cache/lmdb/cache/mod.rs
1
use std::collections::HashMap;
2
use std::fmt::Debug;
3
use std::path::PathBuf;
4

5
use dozer_storage::lmdb::{RoTransaction, RwTransaction, Transaction};
6
use dozer_storage::lmdb_storage::{
7
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
8
};
9

10
use dozer_types::node::SourceStates;
11
use dozer_types::parking_lot::RwLockReadGuard;
12

13
use dozer_types::types::{Field, FieldType, IndexDefinition, Record};
14
use dozer_types::types::{Schema, SchemaIdentifier};
15

16
use super::super::{RoCache, RwCache};
17
use super::indexer::Indexer;
18
use super::utils::{self, CacheReadOptions};
19
use super::utils::{CacheOptions, CacheOptionsKind};
20
use crate::cache::expression::QueryExpression;
21
use crate::cache::index::get_primary_key;
22
use crate::cache::RecordWithId;
23
use crate::errors::CacheError;
24
use query::LmdbQueryHandler;
25

26
mod checkpoint_database;
27
mod helper;
28
mod id_database;
29
mod query;
30
mod record_database;
31
mod schema_database;
32
mod secondary_index_database;
33

34
use checkpoint_database::CheckpointDatabase;
35
pub use id_database::IdDatabase;
36
pub use record_database::RecordDatabase;
37
use schema_database::SchemaDatabase;
38
use secondary_index_database::SecondaryIndexDatabase;
39

40
pub type SecondaryIndexDatabases = HashMap<(SchemaIdentifier, usize), SecondaryIndexDatabase>;
41

42
#[derive(Clone, Debug)]
286✔
43
pub struct CacheCommonOptions {
×
44
    // Total number of readers allowed
45
    pub max_readers: u32,
46
    // Max no of dbs
47
    pub max_db_size: u32,
48

49
    /// The chunk size when calculating intersection of index queries.
50
    pub intersection_chunk_size: usize,
51

52
    /// Provide a path where db will be created. If nothing is provided, will default to a temp location.
53
    /// Db path will be `PathBuf.join(String)`.
54
    pub path: Option<(PathBuf, String)>,
55
}
56

57
impl Default for CacheCommonOptions {
58
    fn default() -> Self {
154✔
59
        Self {
154✔
60
            max_readers: 1000,
154✔
61
            max_db_size: 1000,
154✔
62
            intersection_chunk_size: 100,
154✔
63
            path: None,
154✔
64
        }
154✔
65
    }
154✔
66
}
×
67

68
#[derive(Debug)]
×
69
pub struct LmdbRoCache {
×
70
    common: LmdbCacheCommon,
71
    env: LmdbEnvironmentManager,
72
}
73

74
impl LmdbRoCache {
75
    pub fn new(options: CacheCommonOptions) -> Result<Self, CacheError> {
137✔
76
        let (mut env, name) = utils::init_env(&CacheOptions {
137✔
77
            common: options.clone(),
137✔
78
            kind: CacheOptionsKind::ReadOnly(CacheReadOptions {}),
137✔
79
        })?;
137✔
80
        let common = LmdbCacheCommon::new(&mut env, options, name, true)?;
137✔
81
        Ok(Self { common, env })
137✔
82
    }
137✔
83
}
×
84

85
#[derive(Clone, Debug)]
×
86
pub struct CacheWriteOptions {
×
87
    // Total size allocated for data in a memory mapped file.
88
    // This size is allocated at initialization.
89
    pub max_size: usize,
90
}
91

92
impl Default for CacheWriteOptions {
93
    fn default() -> Self {
153✔
94
        Self {
153✔
95
            max_size: 1024 * 1024 * 1024 * 1024,
153✔
96
        }
153✔
97
    }
153✔
98
}
×
99

100
#[derive(Debug)]
×
101
pub struct LmdbRwCache {
×
102
    common: LmdbCacheCommon,
103
    checkpoint_db: CheckpointDatabase,
104
    txn: SharedTransaction,
105
}
106

107
impl LmdbRwCache {
108
    pub fn create(
145✔
109
        schemas: impl IntoIterator<Item = (String, Schema, Vec<IndexDefinition>)>,
145✔
110
        common_options: CacheCommonOptions,
145✔
111
        write_options: CacheWriteOptions,
145✔
112
    ) -> Result<Self, CacheError> {
145✔
113
        let mut cache = Self::open(common_options, write_options)?;
145✔
114

×
115
        let mut txn = cache.txn.write();
145✔
116
        for (schema_name, schema, secondary_indexes) in schemas {
289✔
117
            cache
144✔
118
                .common
144✔
119
                .insert_schema(&mut txn, schema_name, schema, secondary_indexes)?;
144✔
120
        }
×
121

122
        txn.commit_and_renew()?;
145✔
123
        drop(txn);
145✔
124

145✔
125
        Ok(cache)
145✔
126
    }
145✔
127

×
128
    pub fn open(
149✔
129
        common_options: CacheCommonOptions,
149✔
130
        write_options: CacheWriteOptions,
149✔
131
    ) -> Result<Self, CacheError> {
149✔
132
        let (mut env, name) = utils::init_env(&CacheOptions {
149✔
133
            common: common_options.clone(),
149✔
134
            kind: CacheOptionsKind::Write(write_options),
149✔
135
        })?;
149✔
136
        let common = LmdbCacheCommon::new(&mut env, common_options, name, false)?;
149✔
137
        let checkpoint_db = CheckpointDatabase::new(&mut env)?;
149✔
138
        let txn = env.create_txn()?;
149✔
139
        Ok(Self {
149✔
140
            common,
149✔
141
            checkpoint_db,
149✔
142
            txn,
149✔
143
        })
149✔
144
    }
149✔
145
}
×
146

147
impl<C: LmdbCache> RoCache for C {
148
    fn name(&self) -> &str {
148✔
149
        &self.common().name
148✔
150
    }
148✔
151

×
152
    fn get(&self, key: &[u8]) -> Result<RecordWithId, CacheError> {
43✔
153
        let txn = self.begin_txn()?;
43✔
154
        let txn = txn.as_txn();
43✔
155
        let id = self.common().id.get(txn, key)?;
43✔
156
        let record = self.common().db.get(txn, id)?;
43✔
157
        Ok(RecordWithId::new(id_from_bytes(id), record))
36✔
158
    }
43✔
159

×
160
    fn count(&self, schema_name: &str, query: &QueryExpression) -> Result<usize, CacheError> {
2,220✔
161
        let txn = self.begin_txn()?;
2,220✔
162
        let txn = txn.as_txn();
2,220✔
163
        let (schema, secondary_indexes) = self
2,220✔
164
            .common()
2,220✔
165
            .schema_db
2,220✔
166
            .get_schema_from_name(schema_name)
2,220✔
167
            .ok_or_else(|| CacheError::SchemaNotFound(schema_name.to_string()))?;
2,220✔
168
        let handler = LmdbQueryHandler::new(self.common(), txn, schema, secondary_indexes, query);
2,220✔
169
        handler.count()
2,220✔
170
    }
2,220✔
171

×
172
    fn query(
2,237✔
173
        &self,
2,237✔
174
        schema_name: &str,
2,237✔
175
        query: &QueryExpression,
2,237✔
176
    ) -> Result<(&Schema, Vec<RecordWithId>), CacheError> {
2,237✔
177
        let txn = self.begin_txn()?;
2,237✔
178
        let txn = txn.as_txn();
2,237✔
179
        let (schema, secondary_indexes) = self
2,237✔
180
            .common()
2,237✔
181
            .schema_db
2,237✔
182
            .get_schema_from_name(schema_name)
2,237✔
183
            .ok_or_else(|| CacheError::SchemaNotFound(schema_name.to_string()))?;
2,237✔
184
        let handler = LmdbQueryHandler::new(self.common(), txn, schema, secondary_indexes, query);
2,237✔
185
        let records = handler.query()?;
2,237✔
186
        Ok((schema, records))
2,236✔
187
    }
2,237✔
188

×
189
    fn get_schema_and_indexes_by_name(
55✔
190
        &self,
55✔
191
        name: &str,
55✔
192
    ) -> Result<&(Schema, Vec<IndexDefinition>), CacheError> {
55✔
193
        let schema = self
55✔
194
            .common()
55✔
195
            .schema_db
55✔
196
            .get_schema_from_name(name)
55✔
197
            .ok_or_else(|| CacheError::SchemaNotFound(name.to_string()))?;
55✔
198
        Ok(schema)
55✔
199
    }
55✔
200

×
201
    fn get_schema(&self, schema_identifier: SchemaIdentifier) -> Result<&Schema, CacheError> {
1✔
202
        self.common()
1✔
203
            .schema_db
1✔
204
            .get_schema(schema_identifier)
1✔
205
            .map(|(schema, _)| schema)
1✔
206
            .ok_or(CacheError::SchemaIdentifierNotFound(schema_identifier))
1✔
207
    }
1✔
208
}
×
209

210
impl RwCache for LmdbRwCache {
211
    fn insert(&self, record: &mut Record) -> Result<u64, CacheError> {
11,971✔
212
        let (schema, secondary_indexes) = self.get_schema_and_indexes_from_record(record)?;
11,971✔
213
        record.version = Some(INITIAL_RECORD_VERSION);
11,971✔
214
        self.insert_impl(record, schema, secondary_indexes)
11,971✔
215
    }
11,971✔
216

×
217
    fn delete(&self, key: &[u8]) -> Result<u32, CacheError> {
6✔
218
        let (_, _, version) = self.delete_impl(key)?;
6✔
219
        Ok(version)
6✔
220
    }
6✔
221

×
222
    fn update(&self, key: &[u8], record: &mut Record) -> Result<u32, CacheError> {
7✔
223
        let (schema, secondary_indexes, old_version) = self.delete_impl(key)?;
7✔
224
        record.version = Some(old_version + 1);
7✔
225
        self.insert_impl(record, schema, secondary_indexes)?;
7✔
226
        Ok(old_version)
7✔
227
    }
7✔
228

×
229
    fn commit(&self, checkpoint: &SourceStates) -> Result<(), CacheError> {
139✔
230
        let mut txn = self.txn.write();
139✔
231
        self.checkpoint_db.write(txn.txn_mut(), checkpoint)?;
139✔
232
        txn.commit_and_renew()?;
139✔
233
        Ok(())
139✔
234
    }
139✔
235

×
236
    fn get_checkpoint(&self) -> Result<SourceStates, CacheError> {
×
237
        let txn = self.txn.read();
×
238
        self.checkpoint_db.read(txn.txn())
×
239
    }
×
240
}
×
241

242
impl LmdbRwCache {
243
    fn delete_impl(&self, key: &[u8]) -> Result<(&Schema, &[IndexDefinition], u32), CacheError> {
13✔
244
        let record = self.get(key)?.record;
13✔
245
        let (schema, secondary_indexes) = self.get_schema_and_indexes_from_record(&record)?;
13✔
246

×
247
        let mut txn = self.txn.write();
13✔
248
        let txn = txn.txn_mut();
13✔
249

×
250
        let id = self.common.id.get(txn, key)?;
13✔
251
        self.common.db.delete(txn, id)?;
13✔
252

×
253
        let indexer = Indexer {
13✔
254
            secondary_indexes: &self.common.secondary_indexes,
13✔
255
        };
13✔
256
        indexer.delete_indexes(txn, &record, schema, secondary_indexes, id)?;
13✔
257
        let version = record
13✔
258
            .version
13✔
259
            .expect("All records in cache should have a version");
13✔
260
        Ok((schema, secondary_indexes, version))
13✔
261
    }
13✔
262

×
263
    fn insert_impl(
11,978✔
264
        &self,
11,978✔
265
        record: &Record,
11,978✔
266
        schema: &Schema,
11,978✔
267
        secondary_indexes: &[IndexDefinition],
11,978✔
268
    ) -> Result<u64, CacheError> {
11,978✔
269
        let mut txn = self.txn.write();
11,978✔
270
        let txn = txn.txn_mut();
11,978✔
271

×
272
        let id = if schema.primary_index.is_empty() {
11,978✔
273
            self.common.id.get_or_generate(txn, None)?
1✔
274
        } else {
×
275
            let primary_key = get_primary_key(&schema.primary_index, &record.values);
11,977✔
276
            self.common.id.get_or_generate(txn, Some(&primary_key))?
11,977✔
277
        };
×
278
        self.common.db.insert(txn, id, record)?;
11,978✔
279

×
280
        let indexer = Indexer {
11,978✔
281
            secondary_indexes: &self.common.secondary_indexes,
11,978✔
282
        };
11,978✔
283

11,978✔
284
        indexer.build_indexes(txn, record, schema, secondary_indexes, id)?;
11,978✔
285

×
286
        Ok(id_from_bytes(id))
11,978✔
287
    }
11,978✔
288
}
×
289

290
fn id_from_bytes(bytes: [u8; 8]) -> u64 {
2,389,433✔
291
    u64::from_be_bytes(bytes)
2,389,433✔
292
}
2,389,433✔
293

×
294
fn id_to_bytes(id: u64) -> [u8; 8] {
1,139,602✔
295
    id.to_be_bytes()
1,139,602✔
296
}
1,139,602✔
297

×
298
/// This trait abstracts the behavior of getting a transaction from a `LmdbExclusiveTransaction` or a `lmdb::Transaction`.
299
trait AsTransaction {
300
    type Transaction<'a>: Transaction
301
    where
302
        Self: 'a;
303

304
    fn as_txn(&self) -> &Self::Transaction<'_>;
305
}
306

307
impl<'a> AsTransaction for RoTransaction<'a> {
308
    type Transaction<'env> = RoTransaction<'env> where Self: 'env;
309

310
    fn as_txn(&self) -> &Self::Transaction<'_> {
4,421✔
311
        self
4,421✔
312
    }
4,421✔
313
}
×
314

315
impl<'a> AsTransaction for RwLockReadGuard<'a, LmdbExclusiveTransaction> {
316
    type Transaction<'env> = RwTransaction<'env> where Self: 'env;
317

318
    fn as_txn(&self) -> &Self::Transaction<'_> {
79✔
319
        self.txn()
79✔
320
    }
79✔
321
}
×
322

323
/// This trait abstracts the behavior of locking a `SharedTransaction` for reading
324
/// and beginning a `RoTransaction` from `LmdbEnvironmentManager`.
325
trait LmdbCache: Send + Sync + Debug {
326
    type AsTransaction<'a>: AsTransaction
327
    where
328
        Self: 'a;
329

330
    fn common(&self) -> &LmdbCacheCommon;
331
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError>;
332

333
    fn get_schema_and_indexes_from_record(
11,984✔
334
        &self,
11,984✔
335
        record: &Record,
11,984✔
336
    ) -> Result<&(Schema, Vec<IndexDefinition>), CacheError> {
11,984✔
337
        let schema_identifier = record.schema_id.ok_or(CacheError::SchemaHasNoIdentifier)?;
11,984✔
338
        let schema = self
11,984✔
339
            .common()
11,984✔
340
            .schema_db
11,984✔
341
            .get_schema(schema_identifier)
11,984✔
342
            .ok_or(CacheError::SchemaIdentifierNotFound(schema_identifier))?;
11,984✔
343

×
344
        debug_check_schema_record_consistency(&schema.0, record);
11,984✔
345

11,984✔
346
        Ok(schema)
11,984✔
347
    }
11,984✔
348
}
×
349

350
impl LmdbCache for LmdbRoCache {
351
    type AsTransaction<'a> = RoTransaction<'a>;
352

353
    fn common(&self) -> &LmdbCacheCommon {
8,882✔
354
        &self.common
8,882✔
355
    }
8,882✔
356

×
357
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError> {
4,421✔
358
        Ok(self.env.begin_ro_txn()?)
4,421✔
359
    }
4,421✔
360
}
×
361

362
impl LmdbCache for LmdbRwCache {
363
    type AsTransaction<'a> = RwLockReadGuard<'a, LmdbExclusiveTransaction>;
364

365
    fn common(&self) -> &LmdbCacheCommon {
12,306✔
366
        &self.common
12,306✔
367
    }
12,306✔
368

×
369
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError> {
79✔
370
        Ok(self.txn.read())
79✔
371
    }
79✔
372
}
×
373

374
fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
11,984✔
375
    debug_assert_eq!(schema.identifier, record.schema_id);
11,984✔
376
    debug_assert_eq!(schema.fields.len(), record.values.len());
11,984✔
377
    for (field, value) in schema.fields.iter().zip(record.values.iter()) {
107,772✔
378
        if field.nullable && value == &Field::Null {
107,772✔
379
            continue;
17,869✔
380
        }
89,903✔
381
        match field.typ {
89,903✔
382
            FieldType::UInt => {
×
383
                debug_assert!(value.as_uint().is_some())
41,862✔
384
            }
×
385
            FieldType::Int => {
386
                debug_assert!(value.as_int().is_some())
60✔
387
            }
×
388
            FieldType::Float => {
389
                debug_assert!(value.as_float().is_some())
12,000✔
390
            }
×
391
            FieldType::Boolean => debug_assert!(value.as_boolean().is_some()),
×
392
            FieldType::String => debug_assert!(value.as_string().is_some()),
29,978✔
393
            FieldType::Text => debug_assert!(value.as_text().is_some()),
3✔
394
            FieldType::Binary => debug_assert!(value.as_binary().is_some()),
×
395
            FieldType::Decimal => debug_assert!(value.as_decimal().is_some()),
×
396
            FieldType::Timestamp => debug_assert!(value.as_timestamp().is_some()),
6,000✔
397
            FieldType::Date => debug_assert!(value.as_date().is_some()),
×
398
            FieldType::Bson => debug_assert!(value.as_bson().is_some()),
×
399
            FieldType::Point => debug_assert!(value.as_point().is_some()),
×
400
        }
×
401
    }
402
}
11,984✔
403

×
404
const INITIAL_RECORD_VERSION: u32 = 1_u32;
405

406
#[derive(Debug)]
×
407
pub struct LmdbCacheCommon {
×
408
    db: RecordDatabase,
409
    id: IdDatabase,
410
    secondary_indexes: SecondaryIndexDatabases,
411
    schema_db: SchemaDatabase,
412
    cache_options: CacheCommonOptions,
413
    /// File name of the database.
414
    name: String,
415
}
416

417
impl LmdbCacheCommon {
418
    fn new(
286✔
419
        env: &mut LmdbEnvironmentManager,
286✔
420
        options: CacheCommonOptions,
286✔
421
        name: String,
286✔
422
        read_only: bool,
286✔
423
    ) -> Result<Self, CacheError> {
286✔
424
        // Create or open must have databases.
×
425
        let db = RecordDatabase::new(env, !read_only)?;
286✔
426
        let id = IdDatabase::new(env, !read_only)?;
286✔
427
        let schema_db = SchemaDatabase::new(env, !read_only)?;
286✔
428

×
429
        // Open existing secondary index databases.
430
        let mut secondary_indexe_databases = HashMap::default();
286✔
431
        for (schema, secondary_indexes) in schema_db.get_all_schemas() {
286✔
432
            let schema_id = schema.identifier.ok_or(CacheError::SchemaHasNoIdentifier)?;
133✔
433
            for (index, index_definition) in secondary_indexes.iter().enumerate() {
664✔
434
                let db = SecondaryIndexDatabase::open(env, &schema_id, index, index_definition)?;
664✔
435
                secondary_indexe_databases.insert((schema_id, index), db);
664✔
436
            }
×
437
        }
438

439
        Ok(Self {
286✔
440
            db,
286✔
441
            id,
286✔
442
            secondary_indexes: secondary_indexe_databases,
286✔
443
            schema_db,
286✔
444
            cache_options: options,
286✔
445
            name,
286✔
446
        })
286✔
447
    }
286✔
448

×
449
    fn insert_schema(
144✔
450
        &mut self,
144✔
451
        txn: &mut LmdbExclusiveTransaction,
144✔
452
        schema_name: String,
144✔
453
        schema: Schema,
144✔
454
        secondary_indexes: Vec<IndexDefinition>,
144✔
455
    ) -> Result<(), CacheError> {
144✔
456
        let schema_id = schema.identifier.ok_or(CacheError::SchemaHasNoIdentifier)?;
144✔
457
        for (index, index_definition) in secondary_indexes.iter().enumerate() {
687✔
458
            let db =
687✔
459
                SecondaryIndexDatabase::create(txn, &schema_id, index, index_definition, true)?;
687✔
460
            self.secondary_indexes.insert((schema_id, index), db);
687✔
461
        }
×
462

×
463
        self.schema_db
144✔
464
            .insert(txn.txn_mut(), schema_name, schema, secondary_indexes)?;
144✔
465
        Ok(())
144✔
466
    }
144✔
467
}
×
468

×
469
/// Methods for testing.
×
470
#[cfg(test)]
471
mod tests {
472
    use super::*;
473

474
    impl LmdbRwCache {
475
        pub fn get_txn_and_secondary_indexes(
3✔
476
            &self,
3✔
477
        ) -> (&SharedTransaction, &SecondaryIndexDatabases) {
3✔
478
            (&self.txn, &self.common.secondary_indexes)
3✔
479
        }
3✔
480
    }
×
481
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc