• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.59
/dozer-core/src/dag/executor.rs
1
#![allow(clippy::type_complexity)]
2

3
use crate::dag::dag::Dag;
4
use crate::dag::dag_metadata::{Consistency, DagMetadata, DagMetadataManager};
5
use crate::dag::dag_schemas::{DagSchemas, NodeSchemas};
6
use crate::dag::errors::ExecutionError;
7
use crate::dag::errors::ExecutionError::{IncompatibleSchemas, InconsistentCheckpointMetadata};
8
use crate::dag::executor_utils::index_edges;
9
use crate::dag::node::{NodeHandle, PortHandle, ProcessorFactory, SinkFactory, SourceFactory};
10
use crate::dag::record_store::RecordReader;
11

12
use crossbeam::channel::{bounded, Receiver, Sender};
13
use dozer_types::parking_lot::RwLock;
14
use dozer_types::types::{Operation, Record, Schema};
15

16
use crate::dag::epoch::{Epoch, EpochManager};
17
use std::collections::hash_map::Entry;
18
use std::collections::HashMap;
19
use std::fmt::{Display, Formatter};
20
use std::panic::panic_any;
21
use std::path::{Path, PathBuf};
22
use std::sync::atomic::{AtomicBool, Ordering};
23
use std::sync::{Arc, Barrier};
24
use std::thread::JoinHandle;
25
use std::thread::{self, Builder};
26
use std::time::Duration;
27

28
#[derive(Clone)]
×
29
pub struct ExecutorOptions {
30
    pub commit_sz: u32,
×
31
    pub channel_buffer_sz: usize,
32
    pub commit_time_threshold: Duration,
33
}
34

35
impl Default for ExecutorOptions {
36
    fn default() -> Self {
159✔
37
        Self {
159✔
38
            commit_sz: 10_000,
159✔
39
            channel_buffer_sz: 20_000,
159✔
40
            commit_time_threshold: Duration::from_millis(50),
159✔
41
        }
159✔
42
    }
159✔
43
}
×
44

×
45
#[derive(Clone, Debug, PartialEq, Eq)]
207✔
46
pub(crate) enum InputPortState {
47
    Open,
×
48
    Terminated,
49
}
50

51
#[derive(Clone, Debug, PartialEq, Eq)]
78,763✔
52
pub enum ExecutorOperation {
53
    Delete { old: Record },
×
54
    Insert { new: Record },
55
    Update { old: Record, new: Record },
56
    Commit { epoch: Epoch },
57
    Terminate,
58
}
59

60
impl ExecutorOperation {
61
    pub fn from_operation(op: Operation) -> ExecutorOperation {
×
62
        match op {
×
63
            Operation::Update { old, new } => ExecutorOperation::Update { old, new },
×
64
            Operation::Delete { old } => ExecutorOperation::Delete { old },
×
65
            Operation::Insert { new } => ExecutorOperation::Insert { new },
×
66
        }
×
67
    }
×
68
}
69

×
70
impl Display for ExecutorOperation {
71
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
72
        let type_str = match self {
×
73
            ExecutorOperation::Delete { .. } => "Delete",
×
74
            ExecutorOperation::Update { .. } => "Update",
×
75
            ExecutorOperation::Insert { .. } => "Insert",
×
76
            ExecutorOperation::Terminate { .. } => "Terminate",
×
77
            ExecutorOperation::Commit { .. } => "Commit",
×
78
        };
×
79
        f.write_str(type_str)
×
80
    }
×
81
}
×
82

×
83
mod name;
84
mod node;
85
mod processor_node;
86
mod receiver_loop;
87
mod sink_node;
88
mod source_node;
89

90
use node::Node;
91
use processor_node::ProcessorNode;
×
92
use sink_node::SinkNode;
×
93

×
94
use self::source_node::{SourceListenerNode, SourceSenderNode};
95

96
use super::epoch::OpIdentifier;
97

98
pub struct DagExecutor<'a, T: Clone> {
99
    dag: &'a Dag<T>,
100
    schemas: HashMap<NodeHandle, NodeSchemas<T>>,
101
    record_stores: Arc<RwLock<HashMap<NodeHandle, HashMap<PortHandle, Box<dyn RecordReader>>>>>,
102
    join_handles: HashMap<NodeHandle, JoinHandle<()>>,
103
    path: PathBuf,
104
    options: ExecutorOptions,
105
    running: Arc<AtomicBool>,
106
    consistency_metadata: HashMap<NodeHandle, Option<OpIdentifier>>,
107
}
108

109
impl<'a, T: Clone + 'a + 'static> DagExecutor<'a, T> {
110
    fn check_consistency(
60✔
111
        dag: &'a Dag<T>,
60✔
112
        path: &Path,
60✔
113
    ) -> Result<HashMap<NodeHandle, Option<OpIdentifier>>, ExecutionError> {
60✔
114
        let mut r = HashMap::new();
60✔
115
        let meta = DagMetadataManager::new(dag, path)?;
60✔
116
        let chk = meta.get_checkpoint_consistency();
60✔
117
        for (handle, _factory) in dag.sources() {
73✔
118
            match chk.get(handle) {
73✔
119
                Some(Consistency::FullyConsistent(c)) => {
73✔
120
                    r.insert(handle.clone(), *c);
73✔
121
                }
73✔
122
                _ => return Err(InconsistentCheckpointMetadata),
×
123
            }
×
124
        }
×
125
        Ok(r)
60✔
126
    }
60✔
127

×
128
    pub fn new(
60✔
129
        dag: &'a Dag<T>,
60✔
130
        path: &Path,
60✔
131
        options: ExecutorOptions,
60✔
132
        running: Arc<AtomicBool>,
60✔
133
    ) -> Result<Self, ExecutionError> {
60✔
134
        //
×
135

×
136
        let consistency_metadata = match Self::check_consistency(dag, path) {
60✔
137
            Ok(c) => c,
60✔
138
            Err(_) => {
×
139
                DagMetadataManager::new(dag, path)?.delete_metadata();
×
140
                dag.sources()
×
141
                    .map(|(handle, _)| (handle.clone(), None))
×
142
                    .collect()
×
143
            }
×
144
        };
×
145

×
146
        let schemas = Self::load_or_init_schema(dag, path)?;
60✔
147

148
        Ok(Self {
59✔
149
            dag,
59✔
150
            schemas,
59✔
151
            record_stores: Arc::new(RwLock::new(
59✔
152
                dag.node_handles()
59✔
153
                    .iter()
59✔
154
                    .map(|node_handle| {
248✔
155
                        (
248✔
156
                            node_handle.clone(),
248✔
157
                            HashMap::<PortHandle, Box<dyn RecordReader>>::new(),
248✔
158
                        )
248✔
159
                    })
248✔
160
                    .collect(),
59✔
161
            )),
59✔
162
            path: path.to_path_buf(),
59✔
163
            join_handles: HashMap::new(),
59✔
164
            options,
59✔
165
            running,
59✔
166
            consistency_metadata,
59✔
167
        })
59✔
168
    }
60✔
169

×
170
    pub fn validate(dag: &'a Dag<T>, path: &Path) -> Result<(), ExecutionError> {
×
171
        Self::load_or_init_schema(dag, path).map(|_| ())
×
172
    }
×
173

×
174
    fn validate_schemas(
10✔
175
        current: &NodeSchemas<T>,
10✔
176
        existing: &DagMetadata,
10✔
177
    ) -> Result<(), ExecutionError> {
10✔
178
        if existing.output_schemas.len() != current.output_schemas.len() {
10✔
179
            return Err(IncompatibleSchemas());
×
180
        }
10✔
181
        for (port, (schema, _ctx)) in &current.output_schemas {
17✔
182
            let other_schema = existing
8✔
183
                .output_schemas
8✔
184
                .get(port)
8✔
185
                .ok_or(IncompatibleSchemas())?;
8✔
186
            if schema != other_schema {
8✔
187
                return Err(IncompatibleSchemas());
1✔
188
            }
7✔
189
        }
×
190
        if existing.input_schemas.len() != current.input_schemas.len() {
9✔
191
            return Err(IncompatibleSchemas());
×
192
        }
9✔
193
        for (port, (schema, _ctx)) in &current.output_schemas {
16✔
194
            let other_schema = existing
7✔
195
                .output_schemas
7✔
196
                .get(port)
7✔
197
                .ok_or(IncompatibleSchemas())?;
7✔
198
            if schema != other_schema {
7✔
199
                return Err(IncompatibleSchemas());
×
200
            }
7✔
201
        }
×
202
        Ok(())
9✔
203
    }
10✔
204

×
205
    fn load_or_init_schema(
60✔
206
        dag: &'a Dag<T>,
60✔
207
        path: &Path,
60✔
208
    ) -> Result<HashMap<NodeHandle, NodeSchemas<T>>, ExecutionError> {
60✔
209
        let dag_schemas = DagSchemas::new(dag)?;
60✔
210
        let meta_manager = DagMetadataManager::new(dag, path)?;
60✔
211

×
212
        let current_schemas = dag_schemas.get_all_schemas();
60✔
213
        match meta_manager.get_metadata() {
60✔
214
            Ok(existing_schemas) => {
60✔
215
                for (handle, current) in current_schemas {
309✔
216
                    if let Some(existing) = existing_schemas.get(handle) {
250✔
217
                        Self::validate_schemas(current, existing)?;
10✔
218
                    } else {
219
                        meta_manager.delete_metadata();
240✔
220
                        meta_manager.init_metadata(current_schemas)?;
240✔
221
                    }
×
222
                }
×
223
            }
×
224
            Err(_) => {
×
225
                meta_manager.delete_metadata();
×
226
                meta_manager.init_metadata(current_schemas)?;
×
227
            }
×
228
        };
×
229

×
230
        Ok(current_schemas.clone())
59✔
231
    }
60✔
232

233
    fn start_source(
67✔
234
        &self,
67✔
235
        handle: NodeHandle,
67✔
236
        src_factory: Arc<dyn SourceFactory<T>>,
67✔
237
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
67✔
238
        schemas: &NodeSchemas<T>,
67✔
239
        epoch_manager: Arc<EpochManager>,
67✔
240
        start_barrier: Arc<Barrier>,
67✔
241
    ) -> Result<JoinHandle<()>, ExecutionError> {
67✔
242
        let (sender, receiver) = bounded(self.options.channel_buffer_sz);
67✔
243
        // let (sender, receiver) = bounded(1);
244

×
245
        let start_seq = *self
67✔
246
            .consistency_metadata
67✔
247
            .get(&handle)
67✔
248
            .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?;
67✔
249
        let output_ports = src_factory.get_output_ports()?;
67✔
250

×
251
        let st_node_handle = handle.clone();
67✔
252
        let output_schemas: HashMap<PortHandle, Schema> = schemas
67✔
253
            .output_schemas
67✔
254
            .clone()
67✔
255
            .into_iter()
67✔
256
            .map(|e| (e.0, e.1 .0))
83✔
257
            .collect();
67✔
258
        let running = self.running.clone();
67✔
259
        let running_source = running.clone();
67✔
260
        let source_fn = move |handle: NodeHandle| -> Result<(), ExecutionError> {
67✔
261
            let sender = SourceSenderNode::new(
67✔
262
                handle,
67✔
263
                &*src_factory,
67✔
264
                output_schemas,
67✔
265
                start_seq,
67✔
266
                sender,
67✔
267
                running,
67✔
268
            )?;
67✔
269
            sender.run()
66✔
270
        };
67✔
271

×
272
        let _st_handle = Builder::new()
67✔
273
            .name(format!("{handle}-sender"))
67✔
274
            .spawn(move || {
67✔
275
                if let Err(e) = source_fn(st_node_handle) {
67✔
276
                    if running_source.load(Ordering::Relaxed) {
11✔
277
                        std::panic::panic_any(e);
2✔
278
                    }
9✔
279
                }
56✔
280
            })?;
67✔
281

×
282
        let timeout = self.options.commit_time_threshold;
67✔
283
        let base_path = self.path.clone();
67✔
284
        let record_readers = self.record_stores.clone();
67✔
285
        let edges = self.dag.edges().cloned().collect::<Vec<_>>();
67✔
286
        let running = self.running.clone();
67✔
287
        let running_listener = running.clone();
67✔
288
        let commit_sz = self.options.commit_sz;
67✔
289
        let max_duration_between_commits = self.options.commit_time_threshold;
67✔
290
        let output_schemas: HashMap<PortHandle, Schema> = schemas
67✔
291
            .output_schemas
67✔
292
            .clone()
67✔
293
            .into_iter()
67✔
294
            .map(|e| (e.0, e.1 .0))
83✔
295
            .collect();
67✔
296
        let retention_queue_size = self.options.channel_buffer_sz + 1;
67✔
297
        let source_fn = move |handle: NodeHandle| -> Result<(), ExecutionError> {
67✔
298
            let listener = SourceListenerNode::new(
67✔
299
                handle,
67✔
300
                receiver,
67✔
301
                timeout,
67✔
302
                &base_path,
67✔
303
                &output_ports,
67✔
304
                record_readers,
67✔
305
                senders,
67✔
306
                &edges,
67✔
307
                running,
67✔
308
                commit_sz,
67✔
309
                max_duration_between_commits,
67✔
310
                epoch_manager,
67✔
311
                output_schemas,
67✔
312
                retention_queue_size,
67✔
313
            )?;
67✔
314
            start_barrier.wait();
65✔
315
            listener.run()
65✔
316
        };
67✔
317
        Ok(Builder::new()
67✔
318
            .name(format!("{handle}-listener"))
67✔
319
            .spawn(move || {
67✔
320
                if let Err(e) = source_fn(handle) {
67✔
321
                    if running_listener.load(Ordering::Relaxed) {
10✔
322
                        std::panic::panic_any(e);
5✔
323
                    }
5✔
324
                }
57✔
325
            })?)
67✔
326
    }
67✔
327

×
328
    pub fn start_processor(
112✔
329
        &self,
112✔
330
        handle: NodeHandle,
112✔
331
        proc_factory: Arc<dyn ProcessorFactory<T>>,
112✔
332
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
112✔
333
        receivers: HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>,
112✔
334
        schemas: &NodeSchemas<T>,
112✔
335
    ) -> Result<JoinHandle<()>, ExecutionError> {
112✔
336
        let base_path = self.path.clone();
112✔
337
        let record_readers = self.record_stores.clone();
112✔
338
        let edges = self.dag.edges().cloned().collect::<Vec<_>>();
112✔
339
        let input_schemas: HashMap<PortHandle, Schema> = schemas
112✔
340
            .input_schemas
112✔
341
            .clone()
112✔
342
            .into_iter()
112✔
343
            .map(|e| (e.0, e.1 .0))
132✔
344
            .collect();
112✔
345
        let output_schemas: HashMap<PortHandle, Schema> = schemas
112✔
346
            .output_schemas
112✔
347
            .clone()
112✔
348
            .into_iter()
112✔
349
            .map(|e| (e.0, e.1 .0))
112✔
350
            .collect();
112✔
351
        let running = self.running.clone();
112✔
352
        let retention_queue_size = self.options.channel_buffer_sz + 1;
112✔
353
        let processor_fn = move |handle: NodeHandle| -> Result<(), ExecutionError> {
112✔
354
            let processor = ProcessorNode::new(
112✔
355
                handle,
112✔
356
                &*proc_factory,
112✔
357
                &base_path,
112✔
358
                record_readers,
112✔
359
                receivers,
112✔
360
                senders,
112✔
361
                &edges,
112✔
362
                input_schemas,
112✔
363
                output_schemas,
112✔
364
                retention_queue_size,
112✔
365
            )?;
112✔
366
            processor.run()
110✔
367
        };
112✔
368
        Ok(Builder::new().name(handle.to_string()).spawn(move || {
112✔
369
            if let Err(e) = processor_fn(handle) {
112✔
370
                if running.load(Ordering::Relaxed) {
12✔
371
                    std::panic::panic_any(e);
8✔
372
                }
4✔
373
            }
100✔
374
        })?)
112✔
375
    }
112✔
376

×
377
    pub fn start_sink(
61✔
378
        &self,
61✔
379
        handle: NodeHandle,
61✔
380
        snk_factory: Arc<dyn SinkFactory<T>>,
61✔
381
        receivers: HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>,
61✔
382
        schemas: &NodeSchemas<T>,
61✔
383
    ) -> Result<JoinHandle<()>, ExecutionError> {
61✔
384
        let base_path = self.path.clone();
61✔
385
        let record_readers = self.record_stores.clone();
61✔
386
        let input_schemas: HashMap<PortHandle, Schema> = schemas
61✔
387
            .input_schemas
61✔
388
            .clone()
61✔
389
            .into_iter()
61✔
390
            .map(|e| (e.0, e.1 .0))
61✔
391
            .collect();
61✔
392
        let retention_queue_size = self.options.channel_buffer_sz + 1;
61✔
393
        let snk_fn = move |handle| -> Result<(), ExecutionError> {
61✔
394
            let sink = SinkNode::new(
61✔
395
                handle,
61✔
396
                &*snk_factory,
61✔
397
                &base_path,
61✔
398
                record_readers,
61✔
399
                receivers,
61✔
400
                input_schemas,
61✔
401
                retention_queue_size,
61✔
402
            )?;
61✔
403
            sink.run()
60✔
404
        };
61✔
405
        Ok(Builder::new().name(handle.to_string()).spawn(|| {
61✔
406
            if let Err(e) = snk_fn(handle) {
61✔
407
                std::panic::panic_any(e);
10✔
408
            }
51✔
409
        })?)
61✔
410
    }
61✔
411

×
412
    pub fn start(&mut self) -> Result<(), ExecutionError> {
57✔
413
        let (mut senders, mut receivers) = index_edges(self.dag, self.options.channel_buffer_sz);
57✔
414

×
415
        for (handle, factory) in self.dag.sinks() {
61✔
416
            let join_handle = self.start_sink(
61✔
417
                handle.clone(),
61✔
418
                factory.clone(),
61✔
419
                receivers.remove(handle).expect("BUG in DagExecutor"),
61✔
420
                self.schemas
61✔
421
                    .get(handle)
61✔
422
                    .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?,
61✔
423
            )?;
×
424
            self.join_handles.insert(handle.clone(), join_handle);
61✔
425
        }
426

×
427
        for (handle, factory) in self.dag.processors() {
112✔
428
            let join_handle = self.start_processor(
112✔
429
                handle.clone(),
112✔
430
                factory.clone(),
112✔
431
                senders.remove(handle).expect("BUG in DagExecutor"),
112✔
432
                receivers.remove(handle).expect("BUG in DagExecutor"),
112✔
433
                self.schemas
112✔
434
                    .get(handle)
112✔
435
                    .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?,
112✔
436
            )?;
×
437
            self.join_handles.insert(handle.clone(), join_handle);
112✔
438
        }
×
439

×
440
        let num_sources = self.dag.sources().count();
57✔
441
        let epoch_manager: Arc<EpochManager> = Arc::new(EpochManager::new(num_sources));
57✔
442

57✔
443
        let start_barrier = Arc::new(Barrier::new(num_sources));
57✔
444

×
445
        for (handle, factory) in self.dag.sources() {
67✔
446
            let join_handle = self.start_source(
67✔
447
                handle.clone(),
67✔
448
                factory.clone(),
67✔
449
                senders.remove(handle).expect("BUG in DagExecutor"),
67✔
450
                self.schemas
67✔
451
                    .get(handle)
67✔
452
                    .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?,
67✔
453
                epoch_manager.clone(),
67✔
454
                start_barrier.clone(),
67✔
455
            )?;
×
456
            self.join_handles.insert(handle.clone(), join_handle);
67✔
457
        }
×
458
        Ok(())
57✔
459
    }
57✔
460

×
461
    pub fn stop(&self) {
1✔
462
        self.running.store(false, Ordering::SeqCst);
1✔
463
    }
1✔
464

×
465
    pub fn join(mut self) -> Result<(), ExecutionError> {
56✔
466
        let handles: Vec<NodeHandle> = self.join_handles.iter().map(|e| e.0.clone()).collect();
237✔
467

×
468
        loop {
×
469
            for handle in &handles {
2,128✔
470
                if let Entry::Occupied(entry) = self.join_handles.entry(handle.clone()) {
1,722✔
471
                    if entry.get().is_finished() {
1,715✔
472
                        if let Err(e) = entry.remove().join() {
217✔
473
                            panic_any(e);
9✔
474
                        }
208✔
475
                    }
1,498✔
476
                }
7✔
477
            }
×
478

×
479
            if self.join_handles.is_empty() {
406✔
480
                return Ok(());
47✔
481
            }
359✔
482

359✔
483
            thread::sleep(Duration::from_millis(250));
359✔
484
        }
×
485
    }
47✔
486
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc