• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.03
/dozer-core/src/dag/dag_schemas.rs
1
use crate::dag::dag::{Dag, NodeType};
2
use crate::dag::errors::ExecutionError;
3
use crate::dag::errors::ExecutionError::InvalidNodeHandle;
4

5
use crate::dag::node::{NodeHandle, OutputPortType, PortHandle};
6
use crate::dag::record_store::AutogenRowKeyLookupRecordWriter;
7
use dozer_types::types::Schema;
8
use std::collections::HashMap;
9

10
#[derive(Clone)]
248✔
11
pub struct NodeSchemas<T> {
12
    pub input_schemas: HashMap<PortHandle, (Schema, T)>,
13
    pub output_schemas: HashMap<PortHandle, (Schema, T)>,
14
}
15

16
impl<T> Default for NodeSchemas<T> {
17
    fn default() -> Self {
×
18
        Self::new()
×
19
    }
×
20
}
21

22
impl<T> NodeSchemas<T> {
23
    pub fn new() -> Self {
398✔
24
        Self {
398✔
25
            input_schemas: HashMap::new(),
398✔
26
            output_schemas: HashMap::new(),
398✔
27
        }
398✔
28
    }
398✔
29
    pub fn from(
×
30
        input_schemas: HashMap<PortHandle, (Schema, T)>,
×
31
        output_schemas: HashMap<PortHandle, (Schema, T)>,
×
32
    ) -> Self {
×
33
        Self {
×
34
            input_schemas,
×
35
            output_schemas,
×
36
        }
×
37
    }
×
38
}
39

40
pub struct DagSchemas<T: Clone> {
41
    schemas: HashMap<NodeHandle, NodeSchemas<T>>,
42
}
43

44
impl<T: Clone> DagSchemas<T> {
45
    pub fn new(dag: &Dag<T>) -> Result<Self, ExecutionError> {
93✔
46
        let schemas = populate_schemas(dag)?;
93✔
47
        Ok(Self { schemas })
93✔
48
    }
93✔
49

×
50
    pub fn get_node_input_schemas(
34✔
51
        &self,
34✔
52
        handle: &NodeHandle,
34✔
53
    ) -> Result<&HashMap<PortHandle, (Schema, T)>, ExecutionError> {
34✔
54
        let node = self
34✔
55
            .schemas
34✔
56
            .get(handle)
34✔
57
            .ok_or_else(|| InvalidNodeHandle(handle.clone()))?;
34✔
58
        Ok(&node.input_schemas)
34✔
59
    }
34✔
60

61
    pub fn get_node_output_schemas(
3✔
62
        &self,
3✔
63
        handle: &NodeHandle,
3✔
64
    ) -> Result<&HashMap<PortHandle, (Schema, T)>, ExecutionError> {
3✔
65
        let node = self
3✔
66
            .schemas
3✔
67
            .get(handle)
3✔
68
            .ok_or_else(|| InvalidNodeHandle(handle.clone()))?;
3✔
69
        Ok(&node.output_schemas)
3✔
70
    }
3✔
71

×
72
    pub fn get_all_schemas(&self) -> &HashMap<NodeHandle, NodeSchemas<T>> {
202✔
73
        &self.schemas
202✔
74
    }
202✔
75
}
76

×
77
pub fn prepare_dag<T: Clone>(
32✔
78
    dag: &Dag<T>,
32✔
79
    dag_schemas: &DagSchemas<T>,
32✔
80
) -> Result<(), ExecutionError> {
32✔
81
    for (handle, node) in dag.nodes() {
142✔
82
        let schemas = dag_schemas
142✔
83
            .get_all_schemas()
142✔
84
            .get(handle)
142✔
85
            .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?;
142✔
86

×
87
        match node {
142✔
88
            NodeType::Source(s) => s.prepare(schemas.output_schemas.clone())?,
32✔
89
            NodeType::Sink(s) => s.prepare(schemas.input_schemas.clone())?,
32✔
90
            NodeType::Processor(p) => p.prepare(
78✔
91
                schemas.input_schemas.clone(),
78✔
92
                schemas.output_schemas.clone(),
78✔
93
            )?,
78✔
94
        }
×
95
    }
×
96
    Ok(())
32✔
97
}
32✔
98

×
99
/// In topological order, pass output schemas to downstream nodes' input schemas.
100
fn populate_schemas<T: Clone>(
93✔
101
    dag: &Dag<T>,
93✔
102
) -> Result<HashMap<NodeHandle, NodeSchemas<T>>, ExecutionError> {
93✔
103
    let mut dag_schemas = dag
93✔
104
        .node_handles()
93✔
105
        .iter()
93✔
106
        .map(|node_handle| (node_handle.clone(), NodeSchemas::<T>::new()))
398✔
107
        .collect::<HashMap<_, _>>();
93✔
108

×
109
    for node_handle in dag.topo() {
398✔
110
        match dag.node_from_handle(node_handle) {
398✔
111
            NodeType::Source(source) => {
107✔
112
                let ports = source.get_output_ports()?;
107✔
113
                for port in ports {
244✔
114
                    let (schema, ctx) = source.get_output_schema(&port.handle)?;
137✔
115
                    let schema = prepare_schema_based_on_output_type(schema, port.typ);
137✔
116
                    populate_output_schema(
137✔
117
                        dag,
137✔
118
                        &mut dag_schemas,
137✔
119
                        node_handle,
137✔
120
                        port.handle,
137✔
121
                        (schema, ctx),
137✔
122
                    );
137✔
123
                }
×
124
            }
×
125

×
126
            NodeType::Processor(processor) => {
194✔
127
                let input_schemas =
194✔
128
                    validate_input_schemas(&dag_schemas, node_handle, processor.get_input_ports())?;
194✔
129

130
                let ports = processor.get_output_ports();
194✔
131
                for port in ports {
388✔
132
                    let (schema, ctx) =
194✔
133
                        processor.get_output_schema(&port.handle, &input_schemas)?;
194✔
134
                    let schema = prepare_schema_based_on_output_type(schema, port.typ);
194✔
135
                    populate_output_schema(
194✔
136
                        dag,
194✔
137
                        &mut dag_schemas,
194✔
138
                        node_handle,
194✔
139
                        port.handle,
194✔
140
                        (schema, ctx),
194✔
141
                    );
194✔
142
                }
×
143
            }
×
144

×
145
            NodeType::Sink(sink) => {
97✔
146
                validate_input_schemas(&dag_schemas, node_handle, sink.get_input_ports())?;
97✔
147
            }
×
148
        }
×
149
    }
×
150

×
151
    Ok(dag_schemas)
93✔
152
}
93✔
153

×
154
fn prepare_schema_based_on_output_type(schema: Schema, typ: OutputPortType) -> Schema {
1,087✔
155
    match typ {
1,087✔
156
        OutputPortType::Stateless | OutputPortType::StatefulWithPrimaryKeyLookup { .. } => schema,
1,086✔
157
        OutputPortType::AutogenRowKeyLookup => {
×
158
            AutogenRowKeyLookupRecordWriter::prepare_schema(schema)
1✔
159
        }
×
160
    }
×
161
}
1,087✔
162

×
163
fn populate_output_schema<T: Clone>(
331✔
164
    dag: &Dag<T>,
331✔
165
    dag_schemas: &mut HashMap<NodeHandle, NodeSchemas<T>>,
331✔
166
    node_handle: &NodeHandle,
331✔
167
    port: PortHandle,
331✔
168
    schema: (Schema, T),
331✔
169
) {
331✔
170
    dag_schemas
331✔
171
        .get_mut(node_handle)
331✔
172
        .expect("BUG")
331✔
173
        .output_schemas
331✔
174
        .insert(port, schema.clone());
331✔
175

176
    for (next_node_handle, next_node_port) in dag.edges_from_endpoint(node_handle, port) {
335✔
177
        let next_node_schemas = dag_schemas.get_mut(next_node_handle).expect("BUG");
323✔
178
        next_node_schemas
323✔
179
            .input_schemas
323✔
180
            .insert(next_node_port, schema.clone());
323✔
181
    }
323✔
182
}
331✔
183

×
184
fn validate_input_schemas<T: Clone>(
291✔
185
    dag_schemas: &HashMap<NodeHandle, NodeSchemas<T>>,
291✔
186
    node_handle: &NodeHandle,
291✔
187
    input_ports: Vec<PortHandle>,
291✔
188
) -> Result<HashMap<PortHandle, (Schema, T)>, ExecutionError> {
291✔
189
    let input_schemas = &dag_schemas.get(node_handle).expect("BUG").input_schemas;
291✔
190
    if !eq_ignore_order(input_schemas.keys().copied(), input_ports) {
291✔
191
        return Err(ExecutionError::MissingInput {
×
192
            node: node_handle.clone(),
×
193
        });
×
194
    }
291✔
195
    Ok(input_schemas.clone())
291✔
196
}
291✔
197

×
198
fn eq_ignore_order<I: PartialEq + Ord>(a: impl Iterator<Item = I>, mut b: Vec<I>) -> bool {
291✔
199
    let mut a = a.collect::<Vec<_>>();
291✔
200
    a.sort();
291✔
201
    b.sort();
291✔
202
    a == b
291✔
203
}
291✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc