• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075746792

pending completion
4075746792

Pull #790

github

GitHub
Merge 75b5f293b into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

393 of 393 new or added lines in 12 files covered. (100.0%)

24721 of 36724 relevant lines covered (67.32%)

55788.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.59
/dozer-core/src/dag/record_store.rs
1
use crate::dag::errors::ExecutionError;
2
use crate::dag::errors::ExecutionError::{
3
    InternalError, RecordNotFound, UnsupportedDeleteOperation, UnsupportedUpdateOperation,
4
};
5
use crate::dag::node::OutputPortType;
6
use std::collections::VecDeque;
7

8
use crate::storage::common::Database;
9
use crate::storage::errors::StorageError;
10
use crate::storage::errors::StorageError::{DeserializationError, SerializationError};
11
use crate::storage::lmdb_storage::SharedTransaction;
12
use crate::storage::prefix_transaction::PrefixTransaction;
13
use dozer_types::bincode;
14
use dozer_types::types::{
15
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
16
};
17
use std::fmt::{Debug, Formatter};
18

19
pub trait RecordWriter {
20
    fn write(&mut self, op: Operation, tx: &SharedTransaction)
21
        -> Result<Operation, ExecutionError>;
22
    fn commit(&self) -> Result<(), ExecutionError>;
23
}
24

25
impl Debug for dyn RecordWriter {
26
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
27
        f.write_str("RecordWriter")
×
28
    }
×
29
}
30

31
pub trait RecordReader: Send + Sync {
32
    fn get(&self, key: &[u8], version: u32) -> Result<Option<Record>, ExecutionError>;
33
}
34

35
impl Debug for dyn RecordReader {
36
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
37
        f.write_str("RecordReader")
×
38
    }
×
39
}
40

41
pub(crate) struct RecordWriterUtils {}
42

43
impl RecordWriterUtils {
44
    pub fn create_writer(
204✔
45
        typ: OutputPortType,
204✔
46
        db: Database,
204✔
47
        meta_db: Database,
204✔
48
        schema: Schema,
204✔
49
        retention_queue_size: usize,
204✔
50
    ) -> Result<Box<dyn RecordWriter>, ExecutionError> {
204✔
51
        match typ {
204✔
52
            OutputPortType::StatefulWithPrimaryKeyLookup {
53
                retr_old_records_for_updates,
203✔
54
                retr_old_records_for_deletes,
203✔
55
            } => Ok(Box::new(PrimaryKeyLookupRecordWriter::new(
203✔
56
                db,
203✔
57
                meta_db,
203✔
58
                schema,
203✔
59
                retr_old_records_for_deletes,
203✔
60
                retr_old_records_for_updates,
203✔
61
                retention_queue_size,
203✔
62
            ))),
203✔
63
            OutputPortType::AutogenRowKeyLookup => Ok(Box::new(
1✔
64
                AutogenRowKeyLookupRecordWriter::new(db, meta_db, schema),
1✔
65
            )),
1✔
66
            _ => panic!("Unexpected port type in RecordWriterUtils::create_writer(): {typ}"),
×
67
        }
68
    }
204✔
69
}
70

71
const VERSIONED_RECORDS_INDEX_ID: u32 = 0x01;
72
const RECORD_VERSIONS_INDEX_ID: u32 = 0x02;
73
const INITIAL_RECORD_VERSION: u32 = 1_u32;
74

75
const RECORD_PRESENT_FLAG: u8 = 0x01;
76
const RECORD_DELETED_FLAG: u8 = 0x00;
77

78
#[derive(Debug)]
×
79
pub(crate) struct PrimaryKeyLookupRecordWriter {
80
    db: Database,
81
    _meta_db: Database,
82
    schema: Schema,
83
    retr_old_records_for_deletes: bool,
84
    retr_old_records_for_updates: bool,
85
    retention_queue_size: usize,
86
    retention_queue: VecDeque<(Vec<u8>, u32)>,
87
}
88

89
impl PrimaryKeyLookupRecordWriter {
90
    pub(crate) fn new(
205✔
91
        db: Database,
205✔
92
        meta_db: Database,
205✔
93
        schema: Schema,
205✔
94
        retr_old_records_for_deletes: bool,
205✔
95
        retr_old_records_for_updates: bool,
205✔
96
        retention_queue_size: usize,
205✔
97
    ) -> Self {
205✔
98
        Self {
205✔
99
            db,
205✔
100
            _meta_db: meta_db,
205✔
101
            schema,
205✔
102
            retr_old_records_for_deletes,
205✔
103
            retr_old_records_for_updates,
205✔
104
            retention_queue_size,
205✔
105
            retention_queue: VecDeque::with_capacity(retention_queue_size),
205✔
106
        }
205✔
107
    }
205✔
108

109
    #[inline]
110
    pub(crate) fn get_last_record_version(
60,046✔
111
        &self,
60,046✔
112
        rec_key: &[u8],
60,046✔
113
        tx: &SharedTransaction,
60,046✔
114
    ) -> Result<u32, ExecutionError> {
60,046✔
115
        let mut exclusive_tx = Box::new(tx.write());
60,046✔
116
        let versions_tx = PrefixTransaction::new(exclusive_tx.as_mut(), RECORD_VERSIONS_INDEX_ID);
60,046✔
117
        match versions_tx.get(self.db, rec_key)? {
60,046✔
118
            Some(payload) => Ok(u32::from_le_bytes(payload.try_into().unwrap())),
60,045✔
119
            None => Err(ExecutionError::RecordNotFound()),
1✔
120
        }
121
    }
60,046✔
122

123
    #[inline]
124
    pub(crate) fn put_last_record_version(
592,016✔
125
        &self,
592,016✔
126
        rec_key: &[u8],
592,016✔
127
        version: u32,
592,016✔
128
        tx: &SharedTransaction,
592,016✔
129
    ) -> Result<(), ExecutionError> {
592,016✔
130
        let mut exclusive_tx = Box::new(tx.write());
592,016✔
131
        let mut versions_tx =
592,016✔
132
            PrefixTransaction::new(exclusive_tx.as_mut(), RECORD_VERSIONS_INDEX_ID);
592,016✔
133
        versions_tx
592,016✔
134
            .put(self.db, rec_key, &version.to_le_bytes())
592,016✔
135
            .map_err(ExecutionError::InternalDatabaseError)
592,016✔
136
    }
592,016✔
137

138
    pub(crate) fn write_versioned_record(
139
        &self,
140
        record: Option<&Record>,
141
        mut key: Vec<u8>,
142
        version: u32,
143
        _schema: &Schema,
144
        tx: &SharedTransaction,
145
    ) -> Result<(), ExecutionError> {
146
        self.put_last_record_version(&key, version, tx)?;
592,135✔
147
        key.extend(version.to_le_bytes());
592,135✔
148
        let value = match record {
592,135✔
149
            Some(r) => {
561,893✔
150
                let mut v = Vec::with_capacity(32);
561,893✔
151
                v.extend(RECORD_PRESENT_FLAG.to_le_bytes());
561,893✔
152
                v.extend(bincode::serialize(r).map_err(|e| SerializationError {
561,893✔
153
                    typ: "Record".to_string(),
×
154
                    reason: Box::new(e),
×
155
                })?);
561,893✔
156
                v
561,893✔
157
            }
158
            None => Vec::from(RECORD_DELETED_FLAG.to_le_bytes()),
30,242✔
159
        };
160
        let mut exclusive_tx = Box::new(tx.write());
592,135✔
161
        let mut store = PrefixTransaction::new(exclusive_tx.as_mut(), VERSIONED_RECORDS_INDEX_ID);
592,135✔
162
        store.put(self.db, key.as_slice(), value.as_slice())?;
592,135✔
163
        Ok(())
591,352✔
164
    }
591,352✔
165

166
    pub(crate) fn retr_versioned_record(
60,048✔
167
        &self,
60,048✔
168
        mut key: Vec<u8>,
60,048✔
169
        version: u32,
60,048✔
170
        tx: &SharedTransaction,
60,048✔
171
    ) -> Result<Option<Record>, ExecutionError> {
60,048✔
172
        key.extend(version.to_le_bytes());
60,048✔
173
        let mut exclusive_tx = Box::new(tx.write());
60,048✔
174
        let store = PrefixTransaction::new(exclusive_tx.as_mut(), VERSIONED_RECORDS_INDEX_ID);
60,048✔
175
        let curr = store
60,048✔
176
            .get(self.db, &key)?
60,048✔
177
            .ok_or_else(ExecutionError::RecordNotFound)?;
60,048✔
178
        let rec: Option<Record> = match curr[0] {
60,048✔
179
            RECORD_PRESENT_FLAG => {
180
                Some(
181
                    bincode::deserialize(&curr[1..]).map_err(|e| DeserializationError {
60,047✔
182
                        typ: "Record".to_string(),
×
183
                        reason: Box::new(e),
×
184
                    })?,
60,047✔
185
                )
186
            }
187
            _ => None,
1✔
188
        };
189
        Ok(rec)
60,048✔
190
    }
60,048✔
191

192
    pub(crate) fn del_versioned_record(
39,999✔
193
        &self,
39,999✔
194
        mut key: Vec<u8>,
39,999✔
195
        version: u32,
39,999✔
196
        tx: &SharedTransaction,
39,999✔
197
    ) -> Result<bool, ExecutionError> {
39,999✔
198
        key.extend(version.to_le_bytes());
39,999✔
199
        let mut exclusive_tx = Box::new(tx.write());
39,999✔
200
        let mut store = PrefixTransaction::new(exclusive_tx.as_mut(), VERSIONED_RECORDS_INDEX_ID);
39,999✔
201
        store
39,999✔
202
            .del(self.db, &key, None)
39,999✔
203
            .map_err(|e| InternalError(Box::new(e)))
39,999✔
204
    }
39,999✔
205

206
    pub(crate) fn push_pop_retention_queue(
60,042✔
207
        &mut self,
60,042✔
208
        key: Vec<u8>,
60,042✔
209
        version: u32,
60,042✔
210
        tx: &SharedTransaction,
60,042✔
211
    ) -> Result<(), ExecutionError> {
60,042✔
212
        self.retention_queue.push_back((key, version));
60,042✔
213
        if self.retention_queue.len() > self.retention_queue_size {
60,042✔
214
            if let Some((key, ver)) = self.retention_queue.pop_front() {
39,999✔
215
                self.del_versioned_record(key, ver, tx)?;
39,999✔
216
            }
×
217
        }
20,043✔
218
        Ok(())
60,042✔
219
    }
60,042✔
220
}
221

222
impl RecordWriter for PrimaryKeyLookupRecordWriter {
223
    fn write(
592,154✔
224
        &mut self,
592,154✔
225
        op: Operation,
592,154✔
226
        tx: &SharedTransaction,
592,154✔
227
    ) -> Result<Operation, ExecutionError> {
592,154✔
228
        match op {
592,154✔
229
            Operation::Insert { mut new } => {
531,788✔
230
                let key = new.get_key(&self.schema.primary_index);
531,788✔
231
                self.write_versioned_record(
531,788✔
232
                    Some(&new),
531,788✔
233
                    key,
531,788✔
234
                    INITIAL_RECORD_VERSION,
531,788✔
235
                    &self.schema,
531,788✔
236
                    tx,
531,788✔
237
                )?;
531,788✔
238
                new.version = Some(INITIAL_RECORD_VERSION);
531,658✔
239
                Ok(Operation::Insert { new })
531,658✔
240
            }
241
            Operation::Delete { mut old } => {
30,345✔
242
                let key = old.get_key(&self.schema.primary_index);
30,345✔
243
                let curr_version = self.get_last_record_version(&key, tx)?;
30,345✔
244
                if self.retr_old_records_for_deletes {
30,345✔
245
                    old = self
30,021✔
246
                        .retr_versioned_record(key.to_owned(), curr_version, tx)?
30,021✔
247
                        .ok_or_else(RecordNotFound)?;
30,021✔
248
                }
324✔
249
                self.push_pop_retention_queue(key.clone(), curr_version, tx)?;
30,345✔
250
                self.write_versioned_record(None, key, curr_version + 1, &self.schema, tx)?;
30,345✔
251
                old.version = Some(curr_version);
30,021✔
252
                Ok(Operation::Delete { old })
30,021✔
253
            }
254
            Operation::Update { mut old, mut new } => {
30,021✔
255
                let key = old.get_key(&self.schema.primary_index);
30,021✔
256
                let curr_version = self.get_last_record_version(&key, tx)?;
30,021✔
257
                if self.retr_old_records_for_updates {
30,021✔
258
                    old = self
30,021✔
259
                        .retr_versioned_record(key.to_owned(), curr_version, tx)?
30,021✔
260
                        .ok_or_else(RecordNotFound)?;
30,021✔
261
                }
×
262
                self.push_pop_retention_queue(key.clone(), curr_version, tx)?;
30,021✔
263
                self.write_versioned_record(Some(&new), key, curr_version + 1, &self.schema, tx)?;
30,021✔
264
                old.version = Some(curr_version);
30,021✔
265
                new.version = Some(curr_version + 1);
30,021✔
266
                Ok(Operation::Update { old, new })
30,021✔
267
            }
268
        }
269
    }
591,700✔
270

271
    fn commit(&self) -> Result<(), ExecutionError> {
750✔
272
        Ok(())
750✔
273
    }
750✔
274
}
275

276
#[derive(Debug)]
×
277
pub struct PrimaryKeyValueLookupRecordReader {
278
    tx: SharedTransaction,
279
    db: Database,
280
}
281

282
impl PrimaryKeyValueLookupRecordReader {
283
    pub fn new(tx: SharedTransaction, db: Database) -> Self {
184✔
284
        Self { tx, db }
184✔
285
    }
184✔
286
}
287

288
impl RecordReader for PrimaryKeyValueLookupRecordReader {
289
    fn get(&self, key: &[u8], version: u32) -> Result<Option<Record>, ExecutionError> {
178,430✔
290
        let mut versioned_key: Vec<u8> = Vec::with_capacity(key.len() + 4);
178,430✔
291
        versioned_key.extend(VERSIONED_RECORDS_INDEX_ID.to_be_bytes());
178,430✔
292
        versioned_key.extend(key);
178,430✔
293
        versioned_key.extend(version.to_le_bytes());
178,430✔
294

178,430✔
295
        let guard = self.tx.read();
178,430✔
296

297
        let buf = guard
178,430✔
298
            .get(self.db, &versioned_key)?
178,430✔
299
            .ok_or_else(RecordNotFound)?;
178,430✔
300
        let rec: Option<Record> = match buf[0] {
178,430✔
301
            RECORD_PRESENT_FLAG => {
302
                let mut r: Record =
178,429✔
303
                    bincode::deserialize(&buf[1..]).map_err(|e| DeserializationError {
178,429✔
304
                        typ: "Record".to_string(),
×
305
                        reason: Box::new(e),
×
306
                    })?;
178,429✔
307
                r.version = Some(INITIAL_RECORD_VERSION);
178,429✔
308
                Some(r)
178,429✔
309
            }
310
            _ => None,
1✔
311
        };
312
        Ok(rec)
178,430✔
313
    }
178,430✔
314
}
315

316
const DOZER_ROWID: &str = "_DOZER_ROWID";
317

318
#[derive(Debug)]
×
319
pub struct AutogenRowKeyLookupRecordWriter {
320
    db: Database,
321
    meta_db: Database,
322
    schema: Schema,
323
}
324

325
impl AutogenRowKeyLookupRecordWriter {
326
    const COUNTER_KEY: u16 = 0_u16;
327

328
    pub fn prepare_schema(mut schema: Schema) -> Schema {
1✔
329
        schema.fields.push(FieldDefinition::new(
1✔
330
            DOZER_ROWID.to_string(),
1✔
331
            FieldType::UInt,
1✔
332
            false,
1✔
333
            SourceDefinition::Dynamic,
1✔
334
        ));
1✔
335
        schema.primary_index = vec![schema.fields.len() - 1];
1✔
336
        schema
1✔
337
    }
1✔
338

339
    pub fn new(db: Database, meta_db: Database, schema: Schema) -> Self {
2✔
340
        Self {
2✔
341
            db,
2✔
342
            meta_db,
2✔
343
            schema,
2✔
344
        }
2✔
345
    }
2✔
346

347
    fn write_record(
1,001✔
348
        &self,
1,001✔
349
        rec: &Record,
1,001✔
350
        schema: &Schema,
1,001✔
351
        tx: &SharedTransaction,
1,001✔
352
    ) -> Result<(), ExecutionError> {
1,001✔
353
        let key = rec.get_key(&schema.primary_index);
1,001✔
354
        let value = bincode::serialize(&rec).map_err(|e| SerializationError {
1,001✔
355
            typ: "Record".to_string(),
×
356
            reason: Box::new(e),
×
357
        })?;
1,001✔
358
        tx.write().put(self.db, key.as_slice(), value.as_slice())?;
1,001✔
359
        Ok(())
1,001✔
360
    }
1,001✔
361

362
    fn get_autogen_counter(&mut self, tx: &SharedTransaction) -> Result<u64, StorageError> {
1,001✔
363
        let curr_counter = match tx
1,001✔
364
            .read()
1,001✔
365
            .get(self.meta_db, &Self::COUNTER_KEY.to_le_bytes())?
1,001✔
366
        {
367
            Some(c) => u64::from_le_bytes(c.try_into().map_err(|e| {
999✔
368
                StorageError::DeserializationError {
×
369
                    typ: "u64".to_string(),
×
370
                    reason: Box::new(e),
×
371
                }
×
372
            })?),
999✔
373
            _ => 1_u64,
2✔
374
        };
375
        tx.write().put(
1,001✔
376
            self.meta_db,
1,001✔
377
            &Self::COUNTER_KEY.to_le_bytes(),
1,001✔
378
            &(curr_counter + 1).to_le_bytes(),
1,001✔
379
        )?;
1,001✔
380
        Ok(curr_counter)
1,001✔
381
    }
1,001✔
382
}
383

384
impl RecordWriter for AutogenRowKeyLookupRecordWriter {
385
    fn write(
1,001✔
386
        &mut self,
1,001✔
387
        op: Operation,
1,001✔
388
        tx: &SharedTransaction,
1,001✔
389
    ) -> Result<Operation, ExecutionError> {
1,001✔
390
        match op {
1,001✔
391
            Operation::Insert { mut new } => {
1,001✔
392
                let ctr = self.get_autogen_counter(tx)?;
1,001✔
393
                new.values.push(Field::UInt(ctr));
1,001✔
394
                assert!(
1,001✔
395
                    self.schema.primary_index.len() == 1
1,001✔
396
                        && self.schema.primary_index[0] == new.values.len() - 1
1,001✔
397
                );
398
                self.write_record(&new, &self.schema, tx)?;
1,001✔
399
                new.version = Some(INITIAL_RECORD_VERSION);
1,001✔
400
                Ok(Operation::Insert { new })
1,001✔
401
            }
402
            Operation::Update { .. } => Err(UnsupportedUpdateOperation(
×
403
                "AutogenRowsIdLookupRecordWriter does not support update operations".to_string(),
×
404
            )),
×
405
            Operation::Delete { .. } => Err(UnsupportedDeleteOperation(
×
406
                "AutogenRowsIdLookupRecordWriter does not support delete operations".to_string(),
×
407
            )),
×
408
        }
409
    }
1,001✔
410

411
    fn commit(&self) -> Result<(), ExecutionError> {
1✔
412
        Ok(())
1✔
413
    }
1✔
414
}
415

416
#[derive(Debug)]
×
417
pub struct AutogenRowKeyLookupRecordReader {
418
    tx: SharedTransaction,
419
    db: Database,
420
}
421

422
impl AutogenRowKeyLookupRecordReader {
423
    pub fn new(tx: SharedTransaction, db: Database) -> Self {
2✔
424
        Self { tx, db }
2✔
425
    }
2✔
426
}
427

428
impl RecordReader for AutogenRowKeyLookupRecordReader {
429
    fn get(&self, key: &[u8], _version: u32) -> Result<Option<Record>, ExecutionError> {
1,001✔
430
        Ok(match self.tx.read().get(self.db, key)? {
1,001✔
431
            Some(buf) => {
1,001✔
432
                let mut r: Record =
1,001✔
433
                    bincode::deserialize(buf).map_err(|e| DeserializationError {
1,001✔
434
                        typ: "Record".to_string(),
×
435
                        reason: Box::new(e),
×
436
                    })?;
1,001✔
437
                r.version = Some(INITIAL_RECORD_VERSION);
1,001✔
438
                Some(r)
1,001✔
439
            }
440
            None => None,
×
441
        })
442
    }
1,001✔
443
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc