• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075746792

pending completion
4075746792

Pull #790

github

GitHub
Merge 75b5f293b into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

393 of 393 new or added lines in 12 files covered. (100.0%)

24721 of 36724 relevant lines covered (67.32%)

55788.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.77
/dozer-core/src/dag/dag_schemas.rs
1
use crate::dag::dag::{Dag, NodeType};
2
use crate::dag::errors::ExecutionError;
3
use crate::dag::errors::ExecutionError::InvalidNodeHandle;
4

5
use crate::dag::node::{NodeHandle, OutputPortType, PortHandle};
6
use crate::dag::record_store::AutogenRowKeyLookupRecordWriter;
7
use dozer_types::types::Schema;
8
use std::collections::HashMap;
9

10
#[derive(Clone)]
248✔
11
pub struct NodeSchemas<T> {
12
    pub input_schemas: HashMap<PortHandle, (Schema, T)>,
13
    pub output_schemas: HashMap<PortHandle, (Schema, T)>,
14
}
15

16
impl<T> Default for NodeSchemas<T> {
17
    fn default() -> Self {
×
18
        Self::new()
×
19
    }
×
20
}
21

22
impl<T> NodeSchemas<T> {
23
    pub fn new() -> Self {
398✔
24
        Self {
398✔
25
            input_schemas: HashMap::new(),
398✔
26
            output_schemas: HashMap::new(),
398✔
27
        }
398✔
28
    }
398✔
29
    pub fn from(
×
30
        input_schemas: HashMap<PortHandle, (Schema, T)>,
×
31
        output_schemas: HashMap<PortHandle, (Schema, T)>,
×
32
    ) -> Self {
×
33
        Self {
×
34
            input_schemas,
×
35
            output_schemas,
×
36
        }
×
37
    }
×
38
}
39

40
pub struct DagSchemas<T: Clone> {
41
    schemas: HashMap<NodeHandle, NodeSchemas<T>>,
42
}
43

44
impl<T: Clone> DagSchemas<T> {
45
    pub fn new(dag: &Dag<T>) -> Result<Self, ExecutionError> {
93✔
46
        let schemas = populate_schemas(dag)?;
93✔
47
        Ok(Self { schemas })
93✔
48
    }
93✔
49

×
50
    pub fn get_node_input_schemas(
34✔
51
        &self,
34✔
52
        handle: &NodeHandle,
34✔
53
    ) -> Result<&HashMap<PortHandle, (Schema, T)>, ExecutionError> {
34✔
54
        let node = self
34✔
55
            .schemas
34✔
56
            .get(handle)
34✔
57
            .ok_or_else(|| InvalidNodeHandle(handle.clone()))?;
34✔
58
        Ok(&node.input_schemas)
34✔
59
    }
34✔
60

61
    pub fn get_node_output_schemas(
3✔
62
        &self,
3✔
63
        handle: &NodeHandle,
3✔
64
    ) -> Result<&HashMap<PortHandle, (Schema, T)>, ExecutionError> {
3✔
65
        let node = self
3✔
66
            .schemas
3✔
67
            .get(handle)
3✔
68
            .ok_or_else(|| InvalidNodeHandle(handle.clone()))?;
3✔
69
        Ok(&node.output_schemas)
3✔
70
    }
3✔
71

×
72
    pub fn get_all_schemas(&self) -> &HashMap<NodeHandle, NodeSchemas<T>> {
202✔
73
        &self.schemas
202✔
74
    }
202✔
75
}
76

×
77
pub fn prepare_dag<T: Clone>(
32✔
78
    dag: &Dag<T>,
32✔
79
    dag_schemas: &DagSchemas<T>,
32✔
80
) -> Result<(), ExecutionError> {
32✔
81
    for (handle, node) in dag.nodes() {
142✔
82
        let schemas = dag_schemas
142✔
83
            .get_all_schemas()
142✔
84
            .get(handle)
142✔
85
            .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?;
142✔
86

×
87
        match node {
142✔
88
            NodeType::Source(s) => s.prepare(schemas.output_schemas.clone())?,
32✔
89
            NodeType::Sink(s) => s.prepare(schemas.input_schemas.clone())?,
32✔
90
            NodeType::Processor(p) => p.prepare(
78✔
91
                schemas.input_schemas.clone(),
78✔
92
                schemas.output_schemas.clone(),
78✔
93
            )?,
78✔
94
        }
×
95
    }
×
96
    Ok(())
32✔
97
}
32✔
98

×
99
/// In topological order, pass output schemas to downstream nodes' input schemas.
100
fn populate_schemas<T: Clone>(
93✔
101
    dag: &Dag<T>,
93✔
102
) -> Result<HashMap<NodeHandle, NodeSchemas<T>>, ExecutionError> {
93✔
103
    let mut dag_schemas = dag
93✔
104
        .node_handles()
93✔
105
        .iter()
93✔
106
        .map(|node_handle| (node_handle.clone(), NodeSchemas::<T>::new()))
398✔
107
        .collect::<HashMap<_, _>>();
93✔
108

×
109
    for node_handle in dag.topo() {
398✔
110
        match dag.node_from_handle(node_handle) {
398✔
111
            NodeType::Source(source) => {
107✔
112
                let ports = source.get_output_ports()?;
107✔
113
                for port in ports {
244✔
114
                    let (schema, ctx) = source.get_output_schema(&port.handle)?;
137✔
115
                    let schema = prepare_schema_based_on_output_type(schema, port.typ);
137✔
116
                    populate_output_schema(
137✔
117
                        dag,
137✔
118
                        &mut dag_schemas,
137✔
119
                        node_handle,
137✔
120
                        port.handle,
137✔
121
                        (schema, ctx),
137✔
122
                    );
137✔
123
                }
×
124
            }
×
125

×
126
            NodeType::Processor(processor) => {
194✔
127
                let input_schemas = dag_schemas
194✔
128
                    .get(node_handle)
194✔
129
                    .expect("BUG")
194✔
130
                    .input_schemas
194✔
131
                    .clone();
194✔
132
                if !eq_ignore_order(input_schemas.keys(), processor.get_input_ports().iter()) {
194✔
133
                    return Err(ExecutionError::MissingInput {
×
134
                        node: node_handle.clone(),
×
135
                    });
×
136
                }
194✔
137

194✔
138
                let ports = processor.get_output_ports();
194✔
139
                for port in ports {
388✔
140
                    let (schema, ctx) =
194✔
141
                        processor.get_output_schema(&port.handle, &input_schemas)?;
194✔
142
                    let schema = prepare_schema_based_on_output_type(schema, port.typ);
194✔
143
                    populate_output_schema(
194✔
144
                        dag,
194✔
145
                        &mut dag_schemas,
194✔
146
                        node_handle,
194✔
147
                        port.handle,
194✔
148
                        (schema, ctx),
194✔
149
                    );
194✔
150
                }
×
151
            }
×
152

×
153
            NodeType::Sink(_) => (),
97✔
154
        }
155
    }
×
156

×
157
    Ok(dag_schemas)
93✔
158
}
93✔
159

×
160
fn prepare_schema_based_on_output_type(schema: Schema, typ: OutputPortType) -> Schema {
1,087✔
161
    match typ {
1,087✔
162
        OutputPortType::Stateless | OutputPortType::StatefulWithPrimaryKeyLookup { .. } => schema,
1,086✔
163
        OutputPortType::AutogenRowKeyLookup => {
×
164
            AutogenRowKeyLookupRecordWriter::prepare_schema(schema)
1✔
165
        }
166
    }
167
}
1,087✔
168

169
fn populate_output_schema<T: Clone>(
331✔
170
    dag: &Dag<T>,
331✔
171
    dag_schemas: &mut HashMap<NodeHandle, NodeSchemas<T>>,
331✔
172
    node_handle: &NodeHandle,
331✔
173
    port: PortHandle,
331✔
174
    schema: (Schema, T),
331✔
175
) {
331✔
176
    dag_schemas
331✔
177
        .get_mut(node_handle)
331✔
178
        .expect("BUG")
331✔
179
        .output_schemas
331✔
180
        .insert(port, schema.clone());
331✔
181

×
182
    for (next_node_handle, next_node_port) in dag.edges_from_endpoint(node_handle, port) {
335✔
183
        let next_node_schemas = dag_schemas.get_mut(next_node_handle).expect("BUG");
323✔
184
        next_node_schemas
323✔
185
            .input_schemas
323✔
186
            .insert(next_node_port, schema.clone());
323✔
187
    }
323✔
188
}
331✔
189

×
190
fn eq_ignore_order<I: PartialEq + Ord>(
194✔
191
    a: impl Iterator<Item = I>,
194✔
192
    b: impl Iterator<Item = I>,
194✔
193
) -> bool {
194✔
194
    let mut a = a.collect::<Vec<_>>();
194✔
195
    let mut b = b.collect::<Vec<_>>();
194✔
196
    a.sort();
194✔
197
    b.sort();
194✔
198
    a == b
194✔
199
}
194✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc