• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4829321272

pending completion
4829321272

Pull #1515

github

GitHub
Merge b6d982211 into f2ab0e6ce
Pull Request #1515: feat: Run migration only when necessary

193 of 193 new or added lines in 11 files covered. (100.0%)

35565 of 45252 relevant lines covered (78.59%)

16462.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.06
/dozer-core/src/executor.rs
1
use crate::builder_dag::{BuilderDag, NodeKind};
2
use crate::dag_schemas::DagSchemas;
3
use crate::errors::ExecutionError;
4
use crate::Dag;
5

6
use daggy::petgraph::visit::IntoNodeIdentifiers;
7
use dozer_types::node::NodeHandle;
8

9
use dozer_types::serde::{self, Deserialize, Serialize};
10
use std::collections::hash_map::Entry;
11
use std::collections::HashMap;
12
use std::fmt::Debug;
13
use std::panic::panic_any;
14
use std::sync::atomic::AtomicBool;
15
use std::sync::Arc;
16
use std::thread::JoinHandle;
17
use std::thread::{self, Builder};
18
use std::time::Duration;
19

20
#[derive(Clone)]
×
21
pub struct ExecutorOptions {
22
    pub commit_sz: u32,
23
    pub channel_buffer_sz: usize,
24
    pub commit_time_threshold: Duration,
25
}
26

27
impl Default for ExecutorOptions {
28
    fn default() -> Self {
365✔
29
        Self {
365✔
30
            commit_sz: 10_000,
365✔
31
            channel_buffer_sz: 20_000,
365✔
32
            commit_time_threshold: Duration::from_millis(50),
365✔
33
        }
365✔
34
    }
365✔
35
}
36

37
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1,993✔
38
#[serde(crate = "self::serde")]
39
pub(crate) enum InputPortState {
40
    Open,
41
    Terminated,
42
}
43

44
mod execution_dag;
45
mod name;
46
mod node;
47
mod processor_node;
48
mod receiver_loop;
49
mod sink_node;
50
mod source_node;
51

52
use node::Node;
53
use processor_node::ProcessorNode;
54
use sink_node::SinkNode;
55

56
use self::execution_dag::ExecutionDag;
57
use self::source_node::{create_source_nodes, SourceListenerNode, SourceSenderNode};
58

59
pub struct DagExecutor {
60
    builder_dag: BuilderDag,
61
    options: ExecutorOptions,
62
}
63

64
pub struct DagExecutorJoinHandle {
65
    join_handles: HashMap<NodeHandle, JoinHandle<()>>,
66
}
67

68
impl DagExecutor {
69
    pub fn new<T: Clone + Debug>(
91✔
70
        dag: Dag<T>,
91✔
71
        options: ExecutorOptions,
91✔
72
    ) -> Result<Self, ExecutionError> {
91✔
73
        let dag_schemas = DagSchemas::new(dag)?;
91✔
74
        let builder_dag = BuilderDag::new(dag_schemas)?;
91✔
75

76
        Ok(Self {
87✔
77
            builder_dag,
87✔
78
            options,
87✔
79
        })
87✔
80
    }
89✔
81

82
    pub fn validate<T: Clone + Debug>(dag: Dag<T>) -> Result<(), ExecutionError> {
83
        DagSchemas::new(dag)?;
12✔
84
        Ok(())
12✔
85
    }
12✔
86

87
    pub fn start(self, running: Arc<AtomicBool>) -> Result<DagExecutorJoinHandle, ExecutionError> {
376✔
88
        // Construct execution dag.
89
        let mut execution_dag =
376✔
90
            ExecutionDag::new(self.builder_dag, self.options.channel_buffer_sz)?;
376✔
91
        let node_indexes = execution_dag.graph().node_identifiers().collect::<Vec<_>>();
376✔
92

376✔
93
        // Start the threads.
376✔
94
        let mut join_handles = HashMap::new();
376✔
95
        for node_index in node_indexes {
2,234✔
96
            let node = execution_dag.graph()[node_index]
1,858✔
97
                .as_ref()
1,858✔
98
                .expect("We created all nodes");
1,858✔
99
            let node_handle = node.handle.clone();
1,858✔
100
            match &node.kind {
1,858✔
101
                NodeKind::Source(_, _) => {
102
                    let (source_sender_node, source_listener_node) = create_source_nodes(
383✔
103
                        &mut execution_dag,
383✔
104
                        node_index,
383✔
105
                        &self.options,
383✔
106
                        running.clone(),
383✔
107
                    );
383✔
108
                    join_handles.insert(
383✔
109
                        node_handle,
383✔
110
                        start_source(source_sender_node, source_listener_node)?,
383✔
111
                    );
112
                }
113
                NodeKind::Processor(_) => {
114
                    let processor_node = ProcessorNode::new(&mut execution_dag, node_index);
1,095✔
115
                    join_handles.insert(node_handle, start_processor(processor_node)?);
1,095✔
116
                }
117
                NodeKind::Sink(_) => {
118
                    let sink_node = SinkNode::new(&mut execution_dag, node_index);
380✔
119
                    join_handles.insert(node_handle, start_sink(sink_node)?);
380✔
120
                }
121
            }
122
        }
123

124
        Ok(DagExecutorJoinHandle { join_handles })
376✔
125
    }
376✔
126
}
127

128
impl DagExecutorJoinHandle {
129
    pub fn join(mut self) -> Result<(), ExecutionError> {
375✔
130
        let handles: Vec<NodeHandle> = self.join_handles.iter().map(|e| e.0.clone()).collect();
1,855✔
131

132
        loop {
133
            for handle in &handles {
4,890✔
134
                if let Entry::Occupied(entry) = self.join_handles.entry(handle.clone()) {
4,059✔
135
                    if entry.get().is_finished() {
4,059✔
136
                        if let Err(e) = entry.remove().join() {
1,849✔
137
                            panic_any(e);
3✔
138
                        }
1,846✔
139
                    }
2,210✔
140
                }
×
141
            }
142

143
            if self.join_handles.is_empty() {
831✔
144
                return Ok(());
372✔
145
            }
459✔
146

459✔
147
            thread::sleep(Duration::from_millis(250));
459✔
148
        }
149
    }
372✔
150
}
151

152
fn start_source(
383✔
153
    source_sender: SourceSenderNode,
383✔
154
    source_listener: SourceListenerNode,
383✔
155
) -> Result<JoinHandle<()>, ExecutionError> {
383✔
156
    let handle = source_sender.handle().clone();
383✔
157

158
    let _st_handle = Builder::new()
383✔
159
        .name(format!("{handle}-sender"))
383✔
160
        .spawn(move || match source_sender.run() {
383✔
161
            Ok(_) => {}
378✔
162
            // Channel disconnection means the source listener has quit.
163
            // Maybe it quit gracefully so we don't need to panic.
164
            Err(ExecutionError::CannotSendToChannel) => {}
4✔
165
            // Other errors result in panic.
166
            Err(e) => std::panic::panic_any(e),
1✔
167
        })?;
383✔
168

169
    Ok(Builder::new()
383✔
170
        .name(format!("{handle}-listener"))
383✔
171
        .spawn(move || {
383✔
172
            if let Err(e) = source_listener.run() {
383✔
173
                std::panic::panic_any(e);
3✔
174
            }
380✔
175
        })?)
383✔
176
}
383✔
177

178
fn start_processor(processor: ProcessorNode) -> Result<JoinHandle<()>, ExecutionError> {
1,095✔
179
    Ok(Builder::new()
1,095✔
180
        .name(processor.handle().to_string())
1,095✔
181
        .spawn(move || {
1,095✔
182
            if let Err(e) = processor.run() {
1,095✔
183
                std::panic::panic_any(e);
3✔
184
            }
1,092✔
185
        })?)
1,095✔
186
}
1,095✔
187

188
fn start_sink(sink: SinkNode) -> Result<JoinHandle<()>, ExecutionError> {
380✔
189
    Ok(Builder::new().name(sink.handle().to_string()).spawn(|| {
380✔
190
        if let Err(e) = sink.run() {
380✔
191
            std::panic::panic_any(e);
3✔
192
        }
377✔
193
    })?)
380✔
194
}
380✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc