• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075746792

pending completion
4075746792

Pull #790

github

GitHub
Merge 75b5f293b into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

393 of 393 new or added lines in 12 files covered. (100.0%)

24721 of 36724 relevant lines covered (67.32%)

55788.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.2
/dozer-core/src/dag/dag_metadata.rs
1
use crate::dag::dag::Dag;
2
use crate::dag::dag_schemas::NodeSchemas;
3
use crate::dag::errors::ExecutionError;
4
use crate::dag::errors::ExecutionError::{InvalidNodeHandle, MetadataAlreadyExists};
5
use crate::dag::node::{NodeHandle, PortHandle};
6
use crate::storage::common::Seek;
7
use crate::storage::errors::StorageError;
8
use crate::storage::errors::StorageError::{DeserializationError, SerializationError};
9
use crate::storage::lmdb_storage::{
10
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
11
};
12
use dozer_types::bincode;
13
use dozer_types::types::Schema;
14
use lmdb::Database;
15
use std::collections::HashMap;
16

17
use std::iter::once;
18
use std::path::Path;
19

20
use super::epoch::{OpIdentifier, SourceStates};
21
use super::hash_map_to_vec::insert_vec_element;
22

23
pub(crate) const METADATA_DB_NAME: &str = "__META__";
24
const SOURCE_ID_IDENTIFIER: u8 = 0_u8;
25
pub(crate) const OUTPUT_SCHEMA_IDENTIFIER: u8 = 1_u8;
26
pub(crate) const INPUT_SCHEMA_IDENTIFIER: u8 = 2_u8;
27

28
pub(crate) enum Consistency {
29
    FullyConsistent(Option<OpIdentifier>),
30
    PartiallyConsistent(HashMap<Option<OpIdentifier>, Vec<NodeHandle>>),
31
}
32

33
pub(crate) struct DagMetadata {
34
    pub commits: SourceStates,
35
    pub input_schemas: HashMap<PortHandle, Schema>,
36
    pub output_schemas: HashMap<PortHandle, Schema>,
37
}
38

×
39
pub(crate) struct DagMetadataManager<'a, T: Clone> {
×
40
    dag: &'a Dag<T>,
×
41
    path: &'a Path,
×
42
    metadata: HashMap<NodeHandle, DagMetadata>,
×
43
}
×
44

45
impl<'a, T: Clone + 'a> DagMetadataManager<'a, T> {
46
    pub fn new(
125✔
47
        dag: &'a Dag<T>,
125✔
48
        path: &'a Path,
125✔
49
    ) -> Result<DagMetadataManager<'a, T>, ExecutionError> {
125✔
50
        let metadata = DagMetadataManager::get_checkpoint_metadata(path, dag)?;
125✔
51

52
        Ok(Self {
125✔
53
            path,
125✔
54
            dag,
125✔
55
            metadata,
125✔
56
        })
125✔
57
    }
125✔
58

59
    fn get_node_checkpoint_metadata(
775✔
60
        path: &Path,
775✔
61
        name: &NodeHandle,
775✔
62
    ) -> Result<Option<DagMetadata>, ExecutionError> {
775✔
63
        let env_name = metadata_environment_name(name);
775✔
64
        if !LmdbEnvironmentManager::exists(path, &env_name) {
775✔
65
            return Ok(None);
721✔
66
        }
54✔
67

×
68
        let mut env = LmdbEnvironmentManager::create(path, &env_name)?;
54✔
69
        let db = env.open_database(METADATA_DB_NAME, false)?;
54✔
70
        let txn = env.create_txn()?;
54✔
71
        let txn = SharedTransaction::try_unwrap(txn)
54✔
72
            .expect("We just created this `SharedTransaction`. It's not shared.");
54✔
73

×
74
        let cur = txn.open_ro_cursor(db)?;
54✔
75
        if !cur.first()? {
54✔
76
            return Err(ExecutionError::InternalDatabaseError(
×
77
                StorageError::InvalidRecord,
×
78
            ));
×
79
        }
54✔
80

54✔
81
        let mut commits = SourceStates::default();
54✔
82
        let mut input_schemas: HashMap<PortHandle, Schema> = HashMap::new();
54✔
83
        let mut output_schemas: HashMap<PortHandle, Schema> = HashMap::new();
54✔
84

×
85
        loop {
86
            let value = cur.read()?.ok_or(ExecutionError::InternalDatabaseError(
122✔
87
                StorageError::InvalidRecord,
122✔
88
            ))?;
122✔
89
            match value.0[0] {
122✔
90
                SOURCE_ID_IDENTIFIER => {
×
91
                    commits.extend(once(deserialize_source_metadata(value.0, value.1)))
43✔
92
                }
×
93
                OUTPUT_SCHEMA_IDENTIFIER => {
×
94
                    let handle: PortHandle = PortHandle::from_be_bytes(
40✔
95
                        (&value.0[1..])
40✔
96
                            .try_into()
40✔
97
                            .map_err(|_e| ExecutionError::InvalidPortHandle(0))?,
40✔
98
                    );
×
99
                    let schema: Schema =
40✔
100
                        bincode::deserialize(value.1).map_err(|e| DeserializationError {
40✔
101
                            typ: "Schema".to_string(),
×
102
                            reason: Box::new(e),
×
103
                        })?;
40✔
104
                    output_schemas.insert(handle, schema);
40✔
105
                }
×
106
                INPUT_SCHEMA_IDENTIFIER => {
×
107
                    let handle: PortHandle = PortHandle::from_be_bytes(
39✔
108
                        (&value.0[1..])
39✔
109
                            .try_into()
39✔
110
                            .map_err(|_e| ExecutionError::InvalidPortHandle(0))?,
39✔
111
                    );
112
                    let schema: Schema =
39✔
113
                        bincode::deserialize(value.1).map_err(|e| DeserializationError {
39✔
114
                            typ: "Schema".to_string(),
×
115
                            reason: Box::new(e),
×
116
                        })?;
39✔
117
                    input_schemas.insert(handle, schema);
39✔
118
                }
×
119
                _ => {
120
                    return Err(ExecutionError::InternalDatabaseError(
×
121
                        StorageError::InvalidRecord,
×
122
                    ))
×
123
                }
×
124
            }
×
125
            if !cur.next()? {
122✔
126
                break;
54✔
127
            }
68✔
128
        }
×
129

×
130
        Ok(Some(DagMetadata {
54✔
131
            commits,
54✔
132
            input_schemas,
54✔
133
            output_schemas,
54✔
134
        }))
54✔
135
    }
775✔
136

×
137
    fn get_checkpoint_metadata(
185✔
138
        path: &Path,
185✔
139
        dag: &Dag<T>,
185✔
140
    ) -> Result<HashMap<NodeHandle, DagMetadata>, ExecutionError> {
185✔
141
        let mut all = HashMap::<NodeHandle, DagMetadata>::new();
185✔
142
        for node in dag.node_handles() {
775✔
143
            if let Some(metadata) = Self::get_node_checkpoint_metadata(path, node)? {
775✔
144
                all.insert(node.clone(), metadata);
54✔
145
            }
721✔
146
        }
147
        Ok(all)
185✔
148
    }
185✔
149

×
150
    fn get_dependency_tree_consistency(
271✔
151
        &self,
271✔
152
        root_node: &NodeHandle,
271✔
153
    ) -> HashMap<Option<OpIdentifier>, Vec<NodeHandle>> {
271✔
154
        let mut result = HashMap::new();
271✔
155

156
        for node_handle in self.dag.bfs(root_node) {
680✔
157
            let seq = self
680✔
158
                .metadata
680✔
159
                .get(node_handle)
680✔
160
                .and_then(|dag_meta_data| dag_meta_data.commits.get(root_node).copied());
680✔
161

680✔
162
            insert_vec_element(&mut result, seq, node_handle.clone());
680✔
163
        }
680✔
164

×
165
        result
271✔
166
    }
271✔
167

×
168
    pub(crate) fn get_checkpoint_consistency(&self) -> HashMap<NodeHandle, Consistency> {
65✔
169
        let mut r: HashMap<NodeHandle, Consistency> = HashMap::new();
65✔
170
        for node_handle in self.dag.node_handles() {
271✔
171
            let consistency = self.get_dependency_tree_consistency(node_handle);
271✔
172
            debug_assert!(!consistency.is_empty());
271✔
173
            if consistency.len() == 1 {
271✔
174
                r.insert(
269✔
175
                    node_handle.clone(),
269✔
176
                    Consistency::FullyConsistent(*consistency.iter().next().unwrap().0),
269✔
177
                );
269✔
178
            } else {
269✔
179
                r.insert(
2✔
180
                    node_handle.clone(),
2✔
181
                    Consistency::PartiallyConsistent(consistency),
2✔
182
                );
2✔
183
            }
2✔
184
        }
×
185
        r
65✔
186
    }
65✔
187

×
188
    pub(crate) fn delete_metadata(&self) {
240✔
189
        for node in self.dag.node_handles() {
1,122✔
190
            LmdbEnvironmentManager::remove(self.path, &metadata_environment_name(node));
1,122✔
191
        }
1,122✔
192
    }
240✔
193

×
194
    pub(crate) fn get_metadata(&self) -> Result<HashMap<NodeHandle, DagMetadata>, ExecutionError> {
60✔
195
        Self::get_checkpoint_metadata(self.path, self.dag)
60✔
196
    }
60✔
197

×
198
    pub(crate) fn init_metadata(
240✔
199
        &self,
240✔
200
        schemas: &HashMap<NodeHandle, NodeSchemas<T>>,
240✔
201
    ) -> Result<(), ExecutionError> {
240✔
202
        for node in self.dag.node_handles() {
1,122✔
203
            let curr_node_schema = schemas
1,122✔
204
                .get(node)
1,122✔
205
                .ok_or_else(|| InvalidNodeHandle(node.clone()))?;
1,122✔
206

×
207
            let env_name = metadata_environment_name(node);
1,122✔
208
            if LmdbEnvironmentManager::exists(self.path, &env_name) {
1,122✔
209
                return Err(MetadataAlreadyExists(node.clone()));
×
210
            }
1,122✔
211

×
212
            let mut env = LmdbEnvironmentManager::create(self.path, &env_name)?;
1,122✔
213
            let db = env.open_database(METADATA_DB_NAME, false)?;
1,122✔
214
            let txn = env.create_txn()?;
1,122✔
215
            let mut txn = SharedTransaction::try_unwrap(txn)
1,122✔
216
                .expect("We just created this `SharedTransaction`. It's not shared.");
1,122✔
217

×
218
            for (handle, (schema, _ctx)) in curr_node_schema.output_schemas.iter() {
1,122✔
219
                let mut key: Vec<u8> = vec![OUTPUT_SCHEMA_IDENTIFIER];
902✔
220
                key.extend(handle.to_be_bytes());
902✔
221
                let value = bincode::serialize(schema).map_err(|e| SerializationError {
902✔
222
                    typ: "Schema".to_string(),
×
223
                    reason: Box::new(e),
×
224
                })?;
902✔
225
                txn.put(db, &key, &value)?;
902✔
226
            }
×
227

×
228
            for (handle, (schema, _ctx)) in curr_node_schema.input_schemas.iter() {
1,122✔
229
                let mut key: Vec<u8> = vec![INPUT_SCHEMA_IDENTIFIER];
923✔
230
                key.extend(handle.to_be_bytes());
923✔
231
                let value = bincode::serialize(schema).map_err(|e| SerializationError {
923✔
232
                    typ: "Schema".to_string(),
×
233
                    reason: Box::new(e),
×
234
                })?;
923✔
235
                txn.put(db, &key, &value)?;
923✔
236
            }
×
237

238
            txn.commit_and_renew()?;
1,122✔
239
        }
×
240
        Ok(())
240✔
241
    }
240✔
242
}
×
243

244
fn metadata_environment_name(node_handle: &NodeHandle) -> String {
8,320✔
245
    format!("{node_handle}")
8,320✔
246
}
8,320✔
247

248
pub fn write_source_metadata<'a>(
4,116✔
249
    txn: &mut LmdbExclusiveTransaction,
4,116✔
250
    db: Database,
4,116✔
251
    metadata: &'a mut impl Iterator<Item = (&'a NodeHandle, OpIdentifier)>,
4,116✔
252
) -> Result<(), StorageError> {
4,116✔
253
    for (source, op_id) in metadata {
8,754✔
254
        let (key, value) = serialize_source_metadata(source, op_id);
4,638✔
255

4,638✔
256
        txn.put(db, &key, &value)?;
4,638✔
257
    }
×
258
    Ok(())
4,116✔
259
}
4,116✔
260

×
261
fn serialize_source_metadata(node_handle: &NodeHandle, op_id: OpIdentifier) -> (Vec<u8>, Vec<u8>) {
4,641✔
262
    let mut key: Vec<u8> = vec![SOURCE_ID_IDENTIFIER];
4,641✔
263
    key.extend(node_handle.to_bytes());
4,641✔
264

4,641✔
265
    let mut value: Vec<u8> = Vec::with_capacity(16);
4,641✔
266
    value.extend(op_id.txid.to_be_bytes());
4,641✔
267
    value.extend(op_id.seq_in_tx.to_be_bytes());
4,641✔
268

4,641✔
269
    (key, value)
4,641✔
270
}
4,641✔
271

×
272
fn deserialize_source_metadata(key: &[u8], value: &[u8]) -> (NodeHandle, OpIdentifier) {
47✔
273
    debug_assert!(key[0] == SOURCE_ID_IDENTIFIER);
47✔
274
    let source = NodeHandle::from_bytes(&key[1..]);
45✔
275

45✔
276
    let txid = u64::from_be_bytes(value[0..8].try_into().unwrap());
45✔
277
    let seq_in_tx = u64::from_be_bytes(value[8..16].try_into().unwrap());
45✔
278
    (source, OpIdentifier { txid, seq_in_tx })
45✔
279
}
45✔
280

×
281
#[cfg(test)]
×
282
mod tests {
×
283
    use super::*;
×
284

×
285
    #[test]
1✔
286
    fn test_source_metadata_serialization() {
1✔
287
        fn check(node_handle: NodeHandle, op_id: OpIdentifier) {
1✔
288
            let (key, value) = serialize_source_metadata(&node_handle, op_id);
1✔
289
            let (node_handle2, op_id2) = deserialize_source_metadata(&key, &value);
1✔
290
            assert_eq!(node_handle2, node_handle);
1✔
291
            assert_eq!(op_id2, op_id);
1✔
292
        }
1✔
293

1✔
294
        check(
1✔
295
            NodeHandle::new(None, "node".to_string()),
1✔
296
            OpIdentifier::new(0, 0),
1✔
297
        );
1✔
298
    }
1✔
299

×
300
    #[test]
1✔
301
    #[should_panic]
×
302
    fn source_metadata_deserialization_panics_on_empty_key() {
1✔
303
        deserialize_source_metadata(&[], &[]);
1✔
304
    }
1✔
305

×
306
    #[test]
1✔
307
    #[should_panic]
308
    fn source_metadata_deserialization_panics_on_invalid_key() {
1✔
309
        let (mut key, _) = serialize_source_metadata(
1✔
310
            &NodeHandle::new(None, "node".to_string()),
1✔
311
            OpIdentifier::default(),
1✔
312
        );
1✔
313
        key[0] = 1;
1✔
314
        deserialize_source_metadata(&key, &[]);
1✔
315
    }
1✔
316

×
317
    #[test]
1✔
318
    #[should_panic]
×
319
    fn source_metadata_deserialization_panics_on_empty_value() {
1✔
320
        let (key, _) = serialize_source_metadata(
1✔
321
            &NodeHandle::new(None, "node".to_string()),
1✔
322
            OpIdentifier::default(),
1✔
323
        );
1✔
324
        deserialize_source_metadata(&key, &[]);
1✔
325
    }
1✔
326
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc