• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698406303

pending completion
4698406303

Pull #1426

github

GitHub
Merge daefffe87 into b6889464a
Pull Request #1426: feat: implement python log bindings

1 of 1 new or added line in 1 file covered. (100.0%)

34863 of 45840 relevant lines covered (76.05%)

10764.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.16
/dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
1
use std::{
2
    hash::{Hash, Hasher},
3
    path::{Path, PathBuf},
4
};
5

6
use dozer_storage::{
7
    errors::StorageError,
8
    lmdb::{RwTransaction, Transaction},
9
    lmdb_storage::{RoLmdbEnvironment, RwLmdbEnvironment},
10
    LmdbEnvironment, LmdbOption,
11
};
12
use dozer_types::{
13
    borrow::IntoOwned,
14
    types::{Field, FieldType, Record, Schema, SchemaWithIndex},
15
};
16
use dozer_types::{
17
    log::warn,
18
    models::api_endpoint::{
19
        OnDeleteResolutionTypes, OnInsertResolutionTypes, OnUpdateResolutionTypes,
20
    },
21
};
22
use tempdir::TempDir;
23

24
use crate::{
25
    cache::{
26
        index,
27
        lmdb::utils::{create_env, open_env},
28
        CacheRecord, RecordMeta, UpsertResult,
29
    },
30
    errors::CacheError,
31
};
32

33
mod operation_log;
34

35
use operation_log::RecordMetadata;
36
pub use operation_log::{Operation, OperationLog};
37

38
use self::operation_log::MetadataKey;
39

40
use super::{CacheOptions, CacheWriteOptions};
41

42
pub trait MainEnvironment: LmdbEnvironment {
43
    fn common(&self) -> &MainEnvironmentCommon;
44

45
    fn schema(&self) -> &SchemaWithIndex;
46

47
    fn base_path(&self) -> &Path {
150✔
48
        &self.common().base_path
150✔
49
    }
150✔
50

51
    fn name(&self) -> &str {
908✔
52
        &self.common().name
908✔
53
    }
908✔
54

55
    fn operation_log(&self) -> OperationLog {
9,598✔
56
        self.common().operation_log
9,598✔
57
    }
9,598✔
58

59
    fn intersection_chunk_size(&self) -> usize {
2✔
60
        self.common().intersection_chunk_size
2✔
61
    }
2✔
62

63
    fn count(&self) -> Result<usize, CacheError> {
54✔
64
        let txn = self.begin_txn()?;
54✔
65
        self.operation_log()
54✔
66
            .count_present_records(&txn, self.schema().0.is_append_only())
54✔
67
            .map_err(Into::into)
54✔
68
    }
54✔
69

70
    fn get(&self, key: &[u8]) -> Result<CacheRecord, CacheError> {
19✔
71
        let txn = self.begin_txn()?;
19✔
72
        self.operation_log()
19✔
73
            .get_record(&txn, key)?
19✔
74
            .ok_or(CacheError::PrimaryKeyNotFound)
19✔
75
    }
19✔
76
}
77

78
#[derive(Debug, Clone)]
1,945✔
79
pub struct MainEnvironmentCommon {
80
    /// The environment base path.
81
    base_path: PathBuf,
82
    /// The environment name.
83
    name: String,
84
    /// The operation log.
85
    operation_log: OperationLog,
86
    intersection_chunk_size: usize,
87
}
88

89
#[derive(Debug)]
×
90
pub struct RwMainEnvironment {
91
    env: RwLmdbEnvironment,
92
    common: MainEnvironmentCommon,
93
    _temp_dir: Option<TempDir>,
94
    schema: SchemaWithIndex,
95
    write_options: CacheWriteOptions,
96
}
97

98
impl LmdbEnvironment for RwMainEnvironment {
99
    fn env(&self) -> &dozer_storage::lmdb::Environment {
63✔
100
        self.env.env()
63✔
101
    }
63✔
102
}
103

104
impl MainEnvironment for RwMainEnvironment {
105
    fn common(&self) -> &MainEnvironmentCommon {
903✔
106
        &self.common
903✔
107
    }
903✔
108

109
    fn schema(&self) -> &SchemaWithIndex {
118✔
110
        &self.schema
118✔
111
    }
118✔
112
}
113

114
impl RwMainEnvironment {
115
    pub fn new(
135✔
116
        schema: Option<&SchemaWithIndex>,
135✔
117
        options: &CacheOptions,
135✔
118
        write_options: CacheWriteOptions,
135✔
119
    ) -> Result<Self, CacheError> {
135✔
120
        let (mut env, (base_path, name), temp_dir) = create_env(options)?;
135✔
121

122
        let operation_log = OperationLog::create(&mut env)?;
135✔
123
        let schema_option = LmdbOption::create(&mut env, Some("schema"))?;
135✔
124

125
        let old_schema = schema_option
159✔
126
            .load(&env.begin_txn()?)?
135✔
127
            .map(IntoOwned::into_owned);
159✔
128

129
        let schema = match (schema, old_schema) {
159✔
130
            (Some(schema), Some(old_schema)) => {
×
131
                if &old_schema != schema {
×
132
                    return Err(CacheError::SchemaMismatch {
×
133
                        name,
×
134
                        given: Box::new(schema.clone()),
×
135
                        stored: Box::new(old_schema),
×
136
                    });
×
137
                }
×
138
                old_schema
×
139
            }
140
            (Some(schema), None) => {
155✔
141
                schema_option.store(env.txn_mut()?, schema)?;
155✔
142
                env.commit()?;
155✔
143
                schema.clone()
155✔
144
            }
145
            (None, Some(schema)) => schema,
4✔
146
            (None, None) => return Err(CacheError::SchemaNotFound),
×
147
        };
148

149
        Ok(Self {
159✔
150
            env,
159✔
151
            common: MainEnvironmentCommon {
159✔
152
                base_path,
159✔
153
                name,
159✔
154
                operation_log,
159✔
155
                intersection_chunk_size: options.intersection_chunk_size,
159✔
156
            },
159✔
157
            schema,
159✔
158
            _temp_dir: temp_dir,
159✔
159
            write_options,
159✔
160
        })
159✔
161
    }
159✔
162

163
    pub fn share(&self) -> RoMainEnvironment {
150✔
164
        RoMainEnvironment {
150✔
165
            env: self.env.share(),
150✔
166
            common: self.common.clone(),
150✔
167
            schema: self.schema.clone(),
150✔
168
        }
150✔
169
    }
150✔
170

171
    pub fn insert(&mut self, record: &Record) -> Result<UpsertResult, CacheError> {
5,883✔
172
        let txn = self.env.txn_mut()?;
5,883✔
173
        insert_impl(
5,883✔
174
            self.common.operation_log,
5,883✔
175
            txn,
5,883✔
176
            &self.schema.0,
5,883✔
177
            record,
5,883✔
178
            self.write_options.insert_resolution,
5,883✔
179
        )
5,883✔
180
    }
5,883✔
181

182
    pub fn delete(&mut self, record: &Record) -> Result<Option<RecordMeta>, CacheError> {
9✔
183
        if self.schema.0.is_append_only() {
9✔
184
            return Err(CacheError::AppendOnlySchema);
×
185
        }
9✔
186

187
        let txn = self.env.txn_mut()?;
9✔
188
        let operation_log = self.common.operation_log;
9✔
189
        let key = calculate_key(&self.schema.0, record);
9✔
190

191
        if let Some((meta, insert_operation_id)) =
7✔
192
            get_existing_record_metadata(operation_log, txn, &key)?
9✔
193
        {
194
            // The record exists.
195
            operation_log.delete(txn, key.as_ref(), meta, insert_operation_id)?;
7✔
196
            Ok(Some(meta))
7✔
197
        } else {
198
            // The record does not exist. Resolve the conflict.
199
            match self.write_options.delete_resolution {
2✔
200
                OnDeleteResolutionTypes::Nothing => {
201
                    warn!("Record (Key: {:?}) not found, ignoring delete", key);
1✔
202
                    Ok(None)
1✔
203
                }
204
                OnDeleteResolutionTypes::Panic => Err(CacheError::PrimaryKeyNotFound),
1✔
205
            }
206
        }
207
    }
9✔
208

209
    pub fn update(&mut self, old: &Record, new: &Record) -> Result<UpsertResult, CacheError> {
4✔
210
        // if old_key == new_key {
211
        //     match (key_exist, conflict_resolution) {
212
        //         (true, _) => Updated, // Case 1
213
        //         (false, Nothing) => Ignored, // Case 2
214
        //         (false, Upsert) => Inserted, // Case 3
215
        //         (false, Panic) => Err, // Case 4
216
        //     }
217
        // } else {
218
        //     match (old_key_exist, new_key_exist, conflict_resolution) {
219
        //         (true, true, Nothing) => Ignored, // Case 5
220
        //         (true, true, Upsert) => Err, // Case 6
221
        //         (true, true, Panic) => Err, // Case 7
222
        //         (true, false, _) => Updated, // Case 8
223
        //         (false, true, Nothing) => Ignored, // Case 9
224
        //         (false, true, Upsert) => Err, // Case 10
225
        //         (false, true, Panic) => Err, // Case 11
226
        //         (false, false, Nothing) => Ignored, // Case 12
227
        //         (false, false, Upsert) => Inserted, // Case 13
228
        //         (false, false, Panic) => Err, // Case 14
229
        //     }
230
        // }
231

232
        let txn = self.env.txn_mut()?;
4✔
233
        let operation_log = self.common.operation_log;
4✔
234
        let old_key = calculate_key(&self.schema.0, old);
4✔
235

236
        if let Some((old_meta, insert_operation_id)) =
1✔
237
            get_existing_record_metadata(operation_log, txn, &old_key)?
4✔
238
        {
239
            // Case 1, 5, 6, 7, 8.
240
            let new_key = calculate_key(&self.schema.0, new);
1✔
241
            if new_key.equal(&old_key) {
1✔
242
                // Case 1.
243
                let new_meta = operation_log.update(
1✔
244
                    txn,
1✔
245
                    old_key.as_ref(),
1✔
246
                    new,
1✔
247
                    old_meta,
1✔
248
                    insert_operation_id,
1✔
249
                )?;
1✔
250
                Ok(UpsertResult::Updated { old_meta, new_meta })
1✔
251
            } else {
252
                // Case 5, 6, 7, 8.
253
                let new_metadata = operation_log.get_deleted_metadata(txn, new_key.as_ref())?;
×
254
                match new_metadata {
×
255
                    Some(RecordMetadata {
256
                        insert_operation_id: Some(_),
257
                        ..
258
                    }) => {
259
                        // Case 5, 6, 7.
260
                        if self.write_options.update_resolution == OnUpdateResolutionTypes::Nothing
×
261
                        {
262
                            // Case 5.
263
                            warn!("Old record (Key: {:?}) and new record (Key: {:?}) both exist, ignoring update", old_key, new_key);
×
264
                            Ok(UpsertResult::Ignored)
×
265
                        } else {
266
                            // Case 6, 7.
267
                            Err(CacheError::PrimaryKeyExists)
×
268
                        }
269
                    }
270
                    Some(RecordMetadata {
271
                        meta,
×
272
                        insert_operation_id: None,
×
273
                    }) => {
×
274
                        // Case 8. Meta from deleted record.
×
275
                        operation_log.delete(
×
276
                            txn,
×
277
                            old_key.as_ref(),
×
278
                            old_meta,
×
279
                            insert_operation_id,
×
280
                        )?;
×
281
                        let new_meta =
×
282
                            operation_log.insert_deleted(txn, new_key.as_ref(), new, meta)?;
×
283
                        Ok(UpsertResult::Updated { old_meta, new_meta })
×
284
                    }
285
                    None => {
286
                        // Case 8. Meta from `insert_new`.
287
                        operation_log.delete(
×
288
                            txn,
×
289
                            old_key.as_ref(),
×
290
                            old_meta,
×
291
                            insert_operation_id,
×
292
                        )?;
×
293
                        let new_meta =
×
294
                            operation_log.insert_new(txn, Some(new_key.as_ref()), new)?;
×
295
                        Ok(UpsertResult::Updated { old_meta, new_meta })
×
296
                    }
297
                }
298
            }
299
        } else {
300
            // Case 2, 3, 4, 9, 10, 11, 12, 13.
301
            match self.write_options.update_resolution {
3✔
302
                OnUpdateResolutionTypes::Nothing => {
303
                    // Case 2, 9, 12.
304
                    warn!("Old record (Key: {:?}) not found, ignoring update", old_key);
1✔
305
                    Ok(UpsertResult::Ignored)
1✔
306
                }
307
                OnUpdateResolutionTypes::Upsert => {
308
                    // Case 3, 10, 13.
309
                    insert_impl(
1✔
310
                        operation_log,
1✔
311
                        txn,
1✔
312
                        &self.schema.0,
1✔
313
                        new,
1✔
314
                        OnInsertResolutionTypes::Panic,
1✔
315
                    )
1✔
316
                }
317
                OnUpdateResolutionTypes::Panic => {
318
                    // Case 4, 11, 14.
319
                    Err(CacheError::PrimaryKeyNotFound)
1✔
320
                }
321
            }
322
        }
323
    }
4✔
324

325
    pub fn commit(&mut self) -> Result<(), CacheError> {
245✔
326
        self.env.commit().map_err(Into::into)
245✔
327
    }
245✔
328
}
329

330
#[derive(Debug)]
×
331
enum OwnedMetadataKey<'a> {
332
    PrimaryKey(Vec<u8>),
333
    Hash(&'a Record, u64),
334
}
335

336
impl<'a> OwnedMetadataKey<'a> {
337
    fn as_ref(&self) -> MetadataKey<'_> {
11,811✔
338
        match self {
11,811✔
339
            OwnedMetadataKey::PrimaryKey(key) => MetadataKey::PrimaryKey(key),
11,731✔
340
            OwnedMetadataKey::Hash(record, hash) => MetadataKey::Hash(record, *hash),
80✔
341
        }
342
    }
11,811✔
343

344
    fn equal(&self, other: &OwnedMetadataKey) -> bool {
345
        match (self, other) {
1✔
346
            (OwnedMetadataKey::PrimaryKey(key1), OwnedMetadataKey::PrimaryKey(key2)) => {
1✔
347
                key1 == key2
1✔
348
            }
349
            (OwnedMetadataKey::Hash(_, hash1), OwnedMetadataKey::Hash(_, hash2)) => hash1 == hash2,
×
350
            _ => false,
×
351
        }
352
    }
1✔
353
}
354

355
fn calculate_key<'a>(schema: &Schema, record: &'a Record) -> OwnedMetadataKey<'a> {
5,880✔
356
    if schema.primary_index.is_empty() {
5,880✔
357
        let mut hasher = ahash::AHasher::default();
40✔
358
        record.hash(&mut hasher);
40✔
359
        let hash = hasher.finish();
40✔
360
        OwnedMetadataKey::Hash(record, hash)
40✔
361
    } else {
362
        let key = index::get_primary_key(&schema.primary_index, &record.values);
5,840✔
363
        OwnedMetadataKey::PrimaryKey(key)
5,840✔
364
    }
365
}
5,880✔
366

367
fn insert_impl(
5,860✔
368
    operation_log: OperationLog,
5,860✔
369
    txn: &mut RwTransaction,
5,860✔
370
    schema: &Schema,
5,860✔
371
    record: &Record,
5,860✔
372
    insert_resolution: OnInsertResolutionTypes,
5,860✔
373
) -> Result<UpsertResult, CacheError> {
5,860✔
374
    debug_check_schema_record_consistency(schema, record);
5,860✔
375

5,860✔
376
    if schema.is_append_only() {
5,860✔
377
        let meta = operation_log.insert_new(txn, None, record)?;
×
378
        Ok(UpsertResult::Inserted { meta })
×
379
    } else {
380
        let key = calculate_key(schema, record);
5,860✔
381
        let metadata = operation_log.get_deleted_metadata(txn, key.as_ref())?;
5,860✔
382
        match metadata {
4✔
383
            Some(RecordMetadata {
384
                meta,
3✔
385
                insert_operation_id: Some(insert_operation_id),
3✔
386
            }) => {
3✔
387
                // The record already exists. Resolve the conflict.
3✔
388
                match insert_resolution {
3✔
389
                    OnInsertResolutionTypes::Nothing => {
390
                        warn!("Record (Key: {:?}) already exist, ignoring insert", key);
1✔
391
                        Ok(UpsertResult::Ignored)
1✔
392
                    }
393
                    OnInsertResolutionTypes::Panic => Err(CacheError::PrimaryKeyExists),
1✔
394
                    OnInsertResolutionTypes::Update => {
395
                        let new_meta = operation_log.update(
1✔
396
                            txn,
1✔
397
                            key.as_ref(),
1✔
398
                            record,
1✔
399
                            meta,
1✔
400
                            insert_operation_id,
1✔
401
                        )?;
1✔
402
                        Ok(UpsertResult::Updated {
1✔
403
                            old_meta: meta,
1✔
404
                            new_meta,
1✔
405
                        })
1✔
406
                    }
407
                }
408
            }
409
            Some(RecordMetadata {
410
                meta,
1✔
411
                insert_operation_id: None,
412
            }) => {
413
                // The record has an id but was deleted.
414
                let new_meta = operation_log.insert_deleted(txn, key.as_ref(), record, meta)?;
1✔
415
                Ok(UpsertResult::Inserted { meta: new_meta })
1✔
416
            }
417
            None => {
418
                // The record does not exist.
419
                let meta = operation_log.insert_new(txn, Some(key.as_ref()), record)?;
5,868✔
420
                Ok(UpsertResult::Inserted { meta })
5,868✔
421
            }
422
        }
423
    }
424
}
5,872✔
425

426
fn get_existing_record_metadata<T: Transaction>(
13✔
427
    operation_log: OperationLog,
13✔
428
    txn: &T,
13✔
429
    key: &OwnedMetadataKey,
13✔
430
) -> Result<Option<(RecordMeta, u64)>, StorageError> {
13✔
431
    if let Some(RecordMetadata {
432
        meta,
8✔
433
        insert_operation_id: Some(insert_operation_id),
8✔
434
    }) = operation_log.get_present_metadata(txn, key.as_ref())?
13✔
435
    {
436
        Ok(Some((meta, insert_operation_id)))
8✔
437
    } else {
438
        Ok(None)
5✔
439
    }
440
}
13✔
441

442
#[derive(Debug, Clone)]
1,789✔
443
pub struct RoMainEnvironment {
444
    env: RoLmdbEnvironment,
445
    common: MainEnvironmentCommon,
446
    schema: SchemaWithIndex,
447
}
448

449
impl LmdbEnvironment for RoMainEnvironment {
450
    fn env(&self) -> &dozer_storage::lmdb::Environment {
1,826✔
451
        self.env.env()
1,826✔
452
    }
1,826✔
453
}
454

455
impl MainEnvironment for RoMainEnvironment {
456
    fn common(&self) -> &MainEnvironmentCommon {
9,755✔
457
        &self.common
9,755✔
458
    }
9,755✔
459

460
    fn schema(&self) -> &SchemaWithIndex {
759✔
461
        &self.schema
759✔
462
    }
759✔
463
}
464

465
impl RoMainEnvironment {
466
    pub fn new(options: &CacheOptions) -> Result<Self, CacheError> {
1✔
467
        let (env, (base_path, name), _temp_dir) = open_env(options)?;
1✔
468

469
        let operation_log = OperationLog::open(&env)?;
1✔
470
        let schema_option = LmdbOption::open(&env, Some("schema"))?;
1✔
471

472
        let schema = schema_option
1✔
473
            .load(&env.begin_txn()?)?
1✔
474
            .map(IntoOwned::into_owned)
1✔
475
            .ok_or(CacheError::SchemaNotFound)?;
1✔
476

477
        Ok(Self {
1✔
478
            env,
1✔
479
            common: MainEnvironmentCommon {
1✔
480
                base_path: base_path.to_path_buf(),
1✔
481
                name: name.to_string(),
1✔
482
                operation_log,
1✔
483
                intersection_chunk_size: options.intersection_chunk_size,
1✔
484
            },
1✔
485
            schema,
1✔
486
        })
1✔
487
    }
1✔
488
}
489

490
fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
5,860✔
491
    debug_assert_eq!(schema.identifier, record.schema_id);
5,860✔
492
    debug_assert_eq!(schema.fields.len(), record.values.len());
5,872✔
493
    for (field, value) in schema.fields.iter().zip(record.values.iter()) {
29,452✔
494
        if field.nullable && value == &Field::Null {
29,452✔
495
            continue;
11,817✔
496
        }
17,737✔
497
        match field.typ {
17,737✔
498
            FieldType::UInt => {
499
                debug_assert!(value.as_uint().is_some())
11,766✔
500
            }
501
            FieldType::U128 => {
502
                debug_assert!(value.as_u128().is_some())
×
503
            }
504
            FieldType::Int => {
505
                debug_assert!(value.as_int().is_some())
79✔
506
            }
507
            FieldType::I128 => {
508
                debug_assert!(value.as_i128().is_some())
×
509
            }
510
            FieldType::Float => {
511
                debug_assert!(value.as_float().is_some())
24✔
512
            }
513
            FieldType::Boolean => debug_assert!(value.as_boolean().is_some()),
×
514
            FieldType::String => debug_assert!(value.as_string().is_some()),
5,884✔
515
            FieldType::Text => debug_assert!(value.as_text().is_some()),
2✔
516
            FieldType::Binary => debug_assert!(value.as_binary().is_some()),
×
517
            FieldType::Decimal => debug_assert!(value.as_decimal().is_some()),
×
518
            FieldType::Timestamp => debug_assert!(value.as_timestamp().is_some()),
×
519
            FieldType::Date => debug_assert!(value.as_date().is_some()),
×
520
            FieldType::Bson => debug_assert!(value.as_bson().is_some()),
×
521
            FieldType::Point => debug_assert!(value.as_point().is_some()),
×
522
            FieldType::Duration => debug_assert!(value.as_duration().is_some()),
×
523
        }
524
    }
525
}
5,980✔
526

527
#[cfg(test)]
528
mod conflict_resolution_tests;
529

530
#[cfg(test)]
531
mod hash_tests;
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc