• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075746792

pending completion
4075746792

Pull #790

github

GitHub
Merge 75b5f293b into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

393 of 393 new or added lines in 12 files covered. (100.0%)

24721 of 36724 relevant lines covered (67.32%)

55788.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.7
/dozer-core/src/dag/dag.rs
1
use daggy::petgraph::visit::{Bfs, EdgeRef, IntoEdges, Topo};
2
use daggy::Walker;
3

4
use crate::dag::errors::ExecutionError;
5
use crate::dag::node::{NodeHandle, PortHandle, ProcessorFactory, SinkFactory, SourceFactory};
6

7
use std::collections::{HashMap, HashSet};
8
use std::sync::Arc;
9

10
pub const DEFAULT_PORT_HANDLE: u16 = 0xffff_u16;
11

×
12
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1,991✔
13
pub struct Endpoint {
14
    pub node: NodeHandle,
15
    pub port: PortHandle,
16
}
17

18
impl Endpoint {
×
19
    pub fn new(node: NodeHandle, port: PortHandle) -> Self {
2,034✔
20
        Self { node, port }
2,034✔
21
    }
2,034✔
22
}
23

×
24
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
688✔
25
pub struct Edge {
26
    pub from: Endpoint,
27
    pub to: Endpoint,
28
}
29

30
impl Edge {
×
31
    pub fn new(from: Endpoint, to: Endpoint) -> Self {
1,013✔
32
        Self { from, to }
1,013✔
33
    }
1,013✔
34
}
35

36
pub enum NodeType<T> {
37
    Source(Arc<dyn SourceFactory<T>>),
38
    Processor(Arc<dyn ProcessorFactory<T>>),
39
    Sink(Arc<dyn SinkFactory<T>>),
40
}
41

42
struct EdgeType {
43
    from: PortHandle,
44
    to: PortHandle,
45
}
46

47
impl EdgeType {
48
    fn new(from: PortHandle, to: PortHandle) -> Self {
574✔
49
        Self { from, to }
574✔
50
    }
574✔
51
}
×
52

53
pub struct Dag<T> {
54
    /// The underlying graph.
55
    graph: daggy::Dag<NodeType<T>, EdgeType>,
56
    /// Map from node handle to node index.
57
    node_lookup_table: HashMap<NodeHandle, daggy::NodeIndex>,
58
    /// Map from node index to node handle.
×
59
    node_handles: Vec<NodeHandle>,
×
60
    /// All edge handles.
×
61
    edge_handles: HashSet<Edge>,
62
}
63

64
impl<T> Default for Dag<T> {
×
65
    fn default() -> Self {
×
66
        Self::new()
×
67
    }
×
68
}
×
69

×
70
impl<T> Dag<T> {
71
    /// Creates an empty DAG.
×
72
    pub fn new() -> Self {
70✔
73
        Self {
70✔
74
            graph: daggy::Dag::new(),
70✔
75
            node_lookup_table: HashMap::new(),
70✔
76
            node_handles: vec![],
70✔
77
            edge_handles: HashSet::new(),
70✔
78
        }
70✔
79
    }
70✔
80

×
81
    /// Adds a node. Panics if the `handle` exists in the `Dag`.
×
82
    pub fn add_node(&mut self, node_builder: NodeType<T>, handle: NodeHandle) {
272✔
83
        let node_index = self.graph.add_node(node_builder);
272✔
84
        self.node_handles.push(handle.clone());
272✔
85
        if let Some(node_index) = self.node_lookup_table.insert(handle, node_index) {
272✔
86
            panic!("A node {node_index:?} has already been inserted using specified node handle");
×
87
        }
272✔
88
    }
272✔
89

×
90
    /// Adds an edge. Panics if there's already an edge from `from` to `to`.
×
91
    ///
92
    /// Returns an error if any of the port cannot be found or the edge would create a cycle.
×
93
    pub fn connect(&mut self, from: Endpoint, to: Endpoint) -> Result<(), ExecutionError> {
212✔
94
        let from_node_index = validate_endpoint(self, &from, PortDirection::Output)?;
212✔
95
        let to_node_index = validate_endpoint(self, &to, PortDirection::Input)?;
209✔
96
        let edge_index = self.graph.add_edge(
208✔
97
            from_node_index,
208✔
98
            to_node_index,
208✔
99
            EdgeType::new(from.port, to.port),
208✔
100
        )?;
208✔
101

102
        if !self.edge_handles.insert(Edge::new(from, to)) {
208✔
103
            panic!("An edge {edge_index:?} has already been inserted using specified edge handle");
×
104
        }
208✔
105

208✔
106
        Ok(())
208✔
107
    }
212✔
108

×
109
    /// Adds another whole `Dag` to `self`. Optionally under a namespace `ns`.
×
110
    pub fn merge(&mut self, ns: Option<u16>, other: Dag<T>) {
4✔
111
        let (other_nodes, _) = other.graph.into_graph().into_nodes_edges();
4✔
112

4✔
113
        // Insert nodes.
4✔
114
        let mut other_node_handle_to_self_node_handle = HashMap::new();
4✔
115
        for (other_node, other_node_handle) in
8✔
116
            other_nodes.into_iter().zip(other.node_handles.into_iter())
4✔
117
        {
8✔
118
            let self_node_handle =
8✔
119
                NodeHandle::new(ns.or(other_node_handle.ns), other_node_handle.id.clone());
8✔
120
            self.add_node(other_node.weight, self_node_handle.clone());
8✔
121
            other_node_handle_to_self_node_handle.insert(other_node_handle, self_node_handle);
8✔
122
        }
8✔
123

×
124
        // Insert edges.
×
125
        let map_endpoint = |other_endpoint: Endpoint| {
8✔
126
            let self_node_handle = other_node_handle_to_self_node_handle
8✔
127
                .get(&other_endpoint.node)
8✔
128
                .expect("BUG in DAG");
8✔
129
            Endpoint::new(self_node_handle.clone(), other_endpoint.port)
8✔
130
        };
8✔
131
        for other_edge_handle in other.edge_handles.into_iter() {
4✔
132
            let from = map_endpoint(other_edge_handle.from);
4✔
133
            let to = map_endpoint(other_edge_handle.to);
4✔
134
            self.connect(from, to).expect("BUG in DAG");
4✔
135
        }
4✔
136
    }
4✔
137

×
138
    /// Returns an iterator over all node handles.
139
    pub fn node_handles(&self) -> &[NodeHandle] {
882✔
140
        &self.node_handles
882✔
141
    }
882✔
142

143
    /// Returns an iterator over all nodes.
144
    pub fn nodes(&self) -> impl Iterator<Item = (&NodeHandle, &NodeType<T>)> {
352✔
145
        self.node_lookup_table
352✔
146
            .iter()
352✔
147
            .map(|(node_handle, node_index)| {
1,450✔
148
                (
1,450✔
149
                    node_handle,
1,450✔
150
                    self.graph.node_weight(*node_index).expect("BUG in DAG"),
1,450✔
151
                )
1,450✔
152
            })
1,450✔
153
    }
352✔
154

×
155
    /// Returns an iterator over source handles and sources.
×
156
    pub fn sources(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn SourceFactory<T>>)> {
174✔
157
        self.nodes().flat_map(|(node_handle, node)| {
174✔
158
            if let NodeType::Source(source) = node {
732✔
159
                Some((node_handle, source))
207✔
160
            } else {
×
161
                None
525✔
162
            }
×
163
        })
732✔
164
    }
174✔
165

×
166
    /// Returns an iterator over processor handles and processors.
×
167
    pub fn processors(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn ProcessorFactory<T>>)> {
57✔
168
        self.nodes().flat_map(|(node_handle, node)| {
57✔
169
            if let NodeType::Processor(processor) = node {
240✔
170
                Some((node_handle, processor))
112✔
171
            } else {
172
                None
128✔
173
            }
×
174
        })
240✔
175
    }
57✔
176

×
177
    /// Returns an iterator over sink handles and sinks.
×
178
    pub fn sinks(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn SinkFactory<T>>)> {
89✔
179
        self.nodes().flat_map(|(node_handle, node)| {
89✔
180
            if let NodeType::Sink(sink) = node {
336✔
181
                Some((node_handle, sink))
93✔
182
            } else {
×
183
                None
243✔
184
            }
×
185
        })
336✔
186
    }
89✔
187

×
188
    /// Returns an iterator over all edge handles.
×
189
    pub fn edges(&self) -> impl Iterator<Item = &Edge> {
243✔
190
        self.edge_handles.iter()
243✔
191
    }
243✔
192

×
193
    /// Finds the node by its handle.
×
194
    pub fn node_from_handle(&self, handle: &NodeHandle) -> &NodeType<T> {
398✔
195
        let node_index = self.node_index(handle);
398✔
196
        self.graph.node_weight(node_index).expect("BUG in DAG")
398✔
197
    }
398✔
198

×
199
    /// Returns an iterator over node handles that are connected to the given node handle.
200
    pub fn edges_from_handle(&self, handle: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
×
201
        let node_index = self.node_index(handle);
×
202
        self.graph
×
203
            .edges(node_index)
×
204
            .map(|edge| &self.node_handles[edge.target().index()])
×
205
    }
×
206

207
    /// Returns an iterator over endpoints that are connected to the given endpoint.
×
208
    pub fn edges_from_endpoint<'a>(
331✔
209
        &'a self,
331✔
210
        node_handle: &'a NodeHandle,
331✔
211
        port_handle: PortHandle,
331✔
212
    ) -> impl Iterator<Item = (&NodeHandle, PortHandle)> {
331✔
213
        self.graph
331✔
214
            .edges(self.node_index(node_handle))
331✔
215
            .filter(move |edge| edge.weight().from == port_handle)
388✔
216
            .map(|edge| (&self.node_handles[edge.target().index()], edge.weight().to))
335✔
217
    }
331✔
218

219
    /// Returns an iterator over all node handles reachable from `start` in a breadth-first search.
220
    pub fn bfs(&self, start: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
271✔
221
        let start = self.node_index(start);
271✔
222

271✔
223
        Bfs::new(self.graph.graph(), start)
271✔
224
            .iter(self.graph.graph())
271✔
225
            .map(|node_index| &self.node_handles[node_index.index()])
680✔
226
    }
271✔
227

228
    /// Returns an iterator over all node handles in topological order.
229
    pub fn topo(&self) -> impl Iterator<Item = &NodeHandle> {
93✔
230
        Topo::new(self.graph.graph())
93✔
231
            .iter(self.graph.graph())
93✔
232
            .map(|node_index| &self.node_handles[node_index.index()])
398✔
233
    }
93✔
234
}
235

236
impl<T> Dag<T> {
237
    fn node_index(&self, node_handle: &NodeHandle) -> daggy::NodeIndex {
1,421✔
238
        *self
1,421✔
239
            .node_lookup_table
1,421✔
240
            .get(node_handle)
1,421✔
241
            .unwrap_or_else(|| panic!("Node handle {node_handle:?} not found in dag"))
1,421✔
242
    }
1,421✔
243
}
244

245
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
421✔
246
enum PortDirection {
247
    Input,
248
    Output,
249
}
250

251
fn validate_endpoint<T>(
421✔
252
    dag: &Dag<T>,
421✔
253
    endpoint: &Endpoint,
421✔
254
    direction: PortDirection,
421✔
255
) -> Result<daggy::NodeIndex, ExecutionError> {
421✔
256
    let node_index = dag.node_index(&endpoint.node);
421✔
257
    let node = dag.graph.node_weight(node_index).expect("BUG in DAG");
421✔
258
    if !contains_port(node, direction, endpoint.port)? {
421✔
259
        return Err(ExecutionError::InvalidPortHandle(endpoint.port));
4✔
260
    }
417✔
261
    Ok(node_index)
417✔
262
}
421✔
263

264
fn contains_port<T>(
421✔
265
    node: &NodeType<T>,
421✔
266
    direction: PortDirection,
421✔
267
    port: PortHandle,
421✔
268
) -> Result<bool, ExecutionError> {
421✔
269
    Ok(match node {
421✔
270
        NodeType::Processor(p) => {
260✔
271
            if direction == PortDirection::Output {
260✔
272
                p.get_output_ports().iter().any(|e| e.handle == port)
119✔
273
            } else {
274
                p.get_input_ports().contains(&port)
141✔
275
            }
276
        }
277
        NodeType::Sink(s) => {
68✔
278
            if direction == PortDirection::Output {
68✔
279
                false
×
280
            } else {
281
                s.get_input_ports().contains(&port)
68✔
282
            }
283
        }
284
        NodeType::Source(s) => {
93✔
285
            if direction == PortDirection::Output {
93✔
286
                s.get_output_ports()?.iter().any(|e| e.handle == port)
110✔
287
            } else {
288
                false
×
289
            }
290
        }
291
    })
292
}
421✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc