• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4829268814

pending completion
4829268814

Pull #1516

github

GitHub
Merge 845b68ec1 into f2ab0e6ce
Pull Request #1516: Prepare v0.1.19

35090 of 44737 relevant lines covered (78.44%)

11496.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.2
/dozer-core/src/dag_impl.rs
1
use daggy::petgraph::dot;
2
use daggy::petgraph::visit::{Bfs, EdgeRef, IntoEdges};
3
use daggy::Walker;
4
use dozer_types::node::NodeHandle;
5

6
use crate::errors::ExecutionError;
7
use crate::node::{PortHandle, ProcessorFactory, SinkFactory, SourceFactory};
8
use std::collections::{HashMap, HashSet};
9
use std::fmt::{Debug, Display};
10
use std::sync::Arc;
11

12
pub const DEFAULT_PORT_HANDLE: u16 = 0xffff_u16;
13

14
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
153✔
15
pub struct Endpoint {
16
    pub node: NodeHandle,
17
    pub port: PortHandle,
18
}
19

20
impl Endpoint {
21
    pub fn new(node: NodeHandle, port: PortHandle) -> Self {
6,097✔
22
        Self { node, port }
6,097✔
23
    }
6,097✔
24
}
25

26
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
21✔
27
pub struct Edge {
28
    pub from: Endpoint,
29
    pub to: Endpoint,
30
}
31

32
impl Edge {
33
    pub fn new(from: Endpoint, to: Endpoint) -> Self {
1,309✔
34
        Self { from, to }
1,309✔
35
    }
1,309✔
36
}
37

38
#[derive(Debug, Clone)]
47✔
39
/// A `SourceFactory`, `ProcessorFactory` or `SinkFactory`.
40
pub enum NodeKind<T> {
41
    Source(Arc<dyn SourceFactory<T>>),
42
    Processor(Arc<dyn ProcessorFactory<T>>),
43
    Sink(Arc<dyn SinkFactory<T>>),
44
}
45

46
#[derive(Debug, Clone)]
47✔
47
/// The node type of the description DAG.
48
pub struct NodeType<T> {
49
    /// The node handle, unique across the DAG.
50
    pub handle: NodeHandle,
51
    /// The node kind.
52
    pub kind: NodeKind<T>,
53
}
54

55
impl<T> Display for NodeType<T> {
56
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
57
        write!(f, "{:?}", self.handle)
×
58
    }
×
59
}
60

61
#[derive(Debug, Clone, Copy)]
46✔
62
/// The edge type of the description DAG.
63
pub struct EdgeType {
64
    pub from: PortHandle,
65
    pub to: PortHandle,
66
}
67

68
impl EdgeType {
69
    pub fn new(from: PortHandle, to: PortHandle) -> Self {
1,757✔
70
        Self { from, to }
1,757✔
71
    }
1,757✔
72
}
73
impl Display for EdgeType {
74
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
75
        write!(f, "{:?} -> {:?}", self.from, self.to)
×
76
    }
×
77
}
78

79
#[derive(Debug, Clone)]
13✔
80
pub struct Dag<T> {
81
    /// The underlying graph.
82
    graph: daggy::Dag<NodeType<T>, EdgeType>,
83
    /// Map from node handle to node index.
84
    node_lookup_table: HashMap<NodeHandle, daggy::NodeIndex>,
85
    /// All edge indexes.
86
    edge_indexes: HashSet<EdgeIndex>,
87
}
88

89
impl<T> Default for Dag<T> {
90
    fn default() -> Self {
×
91
        Self::new()
×
92
    }
×
93
}
94

95
impl<T> Display for Dag<T> {
96
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
97
        write!(f, "{}", dot::Dot::new(&self.graph))
×
98
    }
×
99
}
100

101
impl<T> Dag<T> {
102
    /// Creates an empty DAG.
103
    pub fn new() -> Self {
117✔
104
        Self {
117✔
105
            graph: daggy::Dag::new(),
117✔
106
            node_lookup_table: HashMap::new(),
117✔
107
            edge_indexes: HashSet::new(),
117✔
108
        }
117✔
109
    }
117✔
110

111
    /// Returns the underlying daggy graph.
112
    pub fn graph(&self) -> &daggy::Dag<NodeType<T>, EdgeType> {
1,036✔
113
        &self.graph
1,036✔
114
    }
1,036✔
115

116
    /// Returns the underlying daggy graph.
117
    pub fn into_graph(self) -> daggy::Dag<NodeType<T>, EdgeType> {
111✔
118
        self.graph
111✔
119
    }
111✔
120

121
    /// Print the DAG in DOT format.
122
    pub fn print_dot(&self) {
×
123
        use std::println as info;
×
124
        info!("{}", dot::Dot::new(&self.graph));
×
125
    }
×
126

127
    /// Adds a source. Panics if the `handle` exists in the `Dag`.
128
    pub fn add_source(
120✔
129
        &mut self,
120✔
130
        handle: NodeHandle,
120✔
131
        source: Arc<dyn SourceFactory<T>>,
120✔
132
    ) -> daggy::NodeIndex {
120✔
133
        self.add_node(handle, NodeKind::Source(source))
120✔
134
    }
120✔
135

136
    /// Adds a processor. Panics if the `handle` exists in the `Dag`.
137
    pub fn add_processor(
262✔
138
        &mut self,
262✔
139
        handle: NodeHandle,
262✔
140
        processor: Arc<dyn ProcessorFactory<T>>,
262✔
141
    ) -> daggy::NodeIndex {
262✔
142
        self.add_node(handle, NodeKind::Processor(processor))
262✔
143
    }
262✔
144

145
    /// Adds a sink. Panics if the `handle` exists in the `Dag`.
146
    pub fn add_sink(
110✔
147
        &mut self,
110✔
148
        handle: NodeHandle,
110✔
149
        sink: Arc<dyn SinkFactory<T>>,
110✔
150
    ) -> daggy::NodeIndex {
110✔
151
        self.add_node(handle, NodeKind::Sink(sink))
110✔
152
    }
110✔
153

154
    /// Adds an edge. Panics if there's already an edge from `from` to `to`.
155
    ///
156
    /// Returns an error if any of the port cannot be found or the edge would create a cycle.
157
    pub fn connect(&mut self, from: Endpoint, to: Endpoint) -> Result<(), ExecutionError> {
415✔
158
        let from_node_index = validate_endpoint(self, &from, PortDirection::Output)?;
415✔
159
        let to_node_index = validate_endpoint(self, &to, PortDirection::Input)?;
412✔
160
        self.connect_with_index(from_node_index, from.port, to_node_index, to.port)
412✔
161
    }
416✔
162

163
    /// Adds an edge. Panics if there's already an edge from `from` to `to`.
164
    ///
165
    /// Returns an error if any of the port cannot be found or the edge would create a cycle.
166
    pub fn connect_with_index(
427✔
167
        &mut self,
427✔
168
        from_node_index: daggy::NodeIndex,
427✔
169
        output_port: PortHandle,
427✔
170
        to_node_index: daggy::NodeIndex,
427✔
171
        input_port: PortHandle,
427✔
172
    ) -> Result<(), ExecutionError> {
427✔
173
        validate_port_with_index(self, from_node_index, output_port, PortDirection::Output)?;
427✔
174
        validate_port_with_index(self, to_node_index, input_port, PortDirection::Input)?;
427✔
175
        let edge_index = self.graph.add_edge(
427✔
176
            from_node_index,
427✔
177
            to_node_index,
427✔
178
            EdgeType::new(output_port, input_port),
427✔
179
        )?;
427✔
180

181
        if !self.edge_indexes.insert(EdgeIndex {
427✔
182
            from_node: from_node_index,
427✔
183
            output_port,
427✔
184
            to_node: to_node_index,
427✔
185
            input_port,
427✔
186
        }) {
427✔
187
            panic!("An edge {edge_index:?} has already been inserted using specified edge handle");
1✔
188
        }
426✔
189

426✔
190
        Ok(())
426✔
191
    }
426✔
192

193
    /// Adds another whole `Dag` to `self`. Optionally under a namespace `ns`.
194
    pub fn merge(&mut self, ns: Option<u16>, other: Dag<T>) {
4✔
195
        let (other_nodes, _) = other.graph.into_graph().into_nodes_edges();
4✔
196

4✔
197
        // Insert nodes.
4✔
198
        let mut other_node_index_to_self_node_index = vec![];
4✔
199
        for other_node in other_nodes.into_iter() {
8✔
200
            let other_node = other_node.weight;
8✔
201
            let self_node_handle =
8✔
202
                NodeHandle::new(ns.or(other_node.handle.ns), other_node.handle.id.clone());
8✔
203
            let self_node_index = self.add_node(self_node_handle.clone(), other_node.kind);
8✔
204
            other_node_index_to_self_node_index.push(self_node_index);
8✔
205
        }
8✔
206

207
        // Insert edges.
208
        for other_edge_index in other.edge_indexes.into_iter() {
4✔
209
            let self_from_node =
4✔
210
                other_node_index_to_self_node_index[other_edge_index.from_node.index()];
4✔
211
            let self_to_node =
4✔
212
                other_node_index_to_self_node_index[other_edge_index.to_node.index()];
4✔
213
            self.connect_with_index(
4✔
214
                self_from_node,
4✔
215
                other_edge_index.output_port,
4✔
216
                self_to_node,
4✔
217
                other_edge_index.input_port,
4✔
218
            )
4✔
219
            .expect("BUG in DAG");
4✔
220
        }
4✔
221
    }
4✔
222

223
    /// Returns an iterator over all node handles.
224
    pub fn node_handles(&self) -> impl Iterator<Item = &NodeHandle> {
×
225
        self.nodes().map(|node| &node.handle)
×
226
    }
×
227

228
    /// Returns an iterator over all nodes.
229
    pub fn nodes(&self) -> impl Iterator<Item = &NodeType<T>> {
×
230
        self.graph.raw_nodes().iter().map(|node| &node.weight)
×
231
    }
×
232

233
    /// Returns an iterator over source handles and sources.
234
    pub fn sources(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn SourceFactory<T>>)> {
×
235
        self.nodes().flat_map(|node| {
×
236
            if let NodeKind::Source(source) = &node.kind {
×
237
                Some((&node.handle, source))
×
238
            } else {
239
                None
×
240
            }
241
        })
×
242
    }
×
243

244
    /// Returns an iterator over processor handles and processors.
245
    pub fn processors(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn ProcessorFactory<T>>)> {
×
246
        self.nodes().flat_map(|node| {
×
247
            if let NodeKind::Processor(processor) = &node.kind {
×
248
                Some((&node.handle, processor))
×
249
            } else {
250
                None
×
251
            }
252
        })
×
253
    }
×
254

255
    /// Returns an iterator over sink handles and sinks.
256
    pub fn sinks(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn SinkFactory<T>>)> {
×
257
        self.nodes().flat_map(|node| {
×
258
            if let NodeKind::Sink(sink) = &node.kind {
×
259
                Some((&node.handle, sink))
×
260
            } else {
261
                None
×
262
            }
263
        })
×
264
    }
×
265

266
    /// Returns an iterator over all edge handles.
267
    pub fn edge_handles(&self) -> Vec<Edge> {
1✔
268
        let get_endpoint = |node_index: daggy::NodeIndex, port_handle| {
12✔
269
            let node = &self.graph[node_index];
12✔
270
            Endpoint {
12✔
271
                node: node.handle.clone(),
12✔
272
                port: port_handle,
12✔
273
            }
12✔
274
        };
12✔
275

276
        self.edge_indexes
1✔
277
            .iter()
1✔
278
            .map(|edge_index| {
6✔
279
                Edge::new(
6✔
280
                    get_endpoint(edge_index.from_node, edge_index.output_port),
6✔
281
                    get_endpoint(edge_index.to_node, edge_index.input_port),
6✔
282
                )
6✔
283
            })
6✔
284
            .collect()
1✔
285
    }
1✔
286

287
    /// Finds the node by its handle.
288
    pub fn node_kind_from_handle(&self, handle: &NodeHandle) -> &NodeKind<T> {
×
289
        &self.graph[self.node_index(handle)].kind
×
290
    }
×
291

292
    /// Returns an iterator over node handles that are connected to the given node handle.
293
    pub fn edges_from_handle(&self, handle: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
×
294
        let node_index = self.node_index(handle);
×
295
        self.graph
×
296
            .edges(node_index)
×
297
            .map(|edge| &self.graph[edge.target()].handle)
×
298
    }
×
299

300
    /// Returns an iterator over endpoints that are connected to the given endpoint.
301
    pub fn edges_from_endpoint<'a>(
×
302
        &'a self,
×
303
        node_handle: &'a NodeHandle,
×
304
        port_handle: PortHandle,
×
305
    ) -> impl Iterator<Item = (&NodeHandle, PortHandle)> {
×
306
        self.graph
×
307
            .edges(self.node_index(node_handle))
×
308
            .filter(move |edge| edge.weight().from == port_handle)
×
309
            .map(|edge| (&self.graph[edge.target()].handle, edge.weight().to))
×
310
    }
×
311

312
    /// Returns an iterator over all node handles reachable from `start` in a breadth-first search.
313
    pub fn bfs(&self, start: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
×
314
        let start = self.node_index(start);
×
315

×
316
        Bfs::new(self.graph.graph(), start)
×
317
            .iter(self.graph.graph())
×
318
            .map(|node_index| &self.graph[node_index].handle)
×
319
    }
×
320
}
321

322
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
647✔
323
struct EdgeIndex {
324
    from_node: daggy::NodeIndex,
325
    output_port: PortHandle,
326
    to_node: daggy::NodeIndex,
327
    input_port: PortHandle,
328
}
329

330
impl<T> Dag<T> {
331
    fn add_node(&mut self, handle: NodeHandle, kind: NodeKind<T>) -> daggy::NodeIndex {
499✔
332
        let node_index = self.graph.add_node(NodeType {
499✔
333
            handle: handle.clone(),
499✔
334
            kind,
499✔
335
        });
499✔
336
        if let Some(node_index) = self.node_lookup_table.insert(handle, node_index) {
499✔
337
            panic!("A node {node_index:?} has already been inserted using specified node handle");
×
338
        }
499✔
339
        node_index
499✔
340
    }
499✔
341

342
    fn node_index(&self, node_handle: &NodeHandle) -> daggy::NodeIndex {
828✔
343
        *self
828✔
344
            .node_lookup_table
828✔
345
            .get(node_handle)
828✔
346
            .unwrap_or_else(|| panic!("Node handle {node_handle:?} not found in dag"))
828✔
347
    }
828✔
348
}
349

350
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1,682✔
351
enum PortDirection {
352
    Input,
353
    Output,
354
}
355

356
fn validate_endpoint<T>(
827✔
357
    dag: &Dag<T>,
827✔
358
    endpoint: &Endpoint,
827✔
359
    direction: PortDirection,
827✔
360
) -> Result<daggy::NodeIndex, ExecutionError> {
827✔
361
    let node_index = dag.node_index(&endpoint.node);
827✔
362
    validate_port_with_index(dag, node_index, endpoint.port, direction)?;
827✔
363
    Ok(node_index)
823✔
364
}
827✔
365

366
fn validate_port_with_index<T>(
1,681✔
367
    dag: &Dag<T>,
1,681✔
368
    node_index: daggy::NodeIndex,
1,681✔
369
    port: PortHandle,
1,681✔
370
    direction: PortDirection,
1,681✔
371
) -> Result<(), ExecutionError> {
1,681✔
372
    let node = &dag.graph[node_index];
1,681✔
373
    if !contains_port(&node.kind, direction, port)? {
1,681✔
374
        return Err(ExecutionError::InvalidPortHandle(port));
4✔
375
    }
1,679✔
376
    Ok(())
1,679✔
377
}
1,683✔
378

379
fn contains_port<T>(
1,682✔
380
    node: &NodeKind<T>,
1,682✔
381
    direction: PortDirection,
1,682✔
382
    port: PortHandle,
1,682✔
383
) -> Result<bool, ExecutionError> {
1,682✔
384
    Ok(match node {
1,682✔
385
        NodeKind::Processor(p) => {
1,135✔
386
            if direction == PortDirection::Output {
1,135✔
387
                p.get_output_ports().iter().any(|e| e.handle == port)
509✔
388
            } else {
389
                p.get_input_ports().contains(&port)
625✔
390
            }
391
        }
392
        NodeKind::Sink(s) => {
215✔
393
            if direction == PortDirection::Output {
215✔
394
                false
×
395
            } else {
396
                s.get_input_ports().contains(&port)
215✔
397
            }
398
        }
399
        NodeKind::Source(s) => {
332✔
400
            if direction == PortDirection::Output {
332✔
401
                s.get_output_ports().iter().any(|e| e.handle == port)
445✔
402
            } else {
403
                false
×
404
            }
405
        }
406
    })
407
}
1,681✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc