• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.57
/dozer-core/src/dag/tests/sinks.rs
1
use crate::dag::epoch::Epoch;
2
use crate::dag::errors::ExecutionError;
3
use crate::dag::node::{PortHandle, Sink, SinkFactory};
4
use crate::dag::record_store::RecordReader;
5
use crate::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
6
use dozer_types::types::{Operation, Schema};
7

8
use dozer_types::log::info;
9
use std::collections::HashMap;
10

11
use crate::dag::tests::app::NoneContext;
12
use std::sync::atomic::{AtomicBool, Ordering};
13
use std::sync::Arc;
14

15
pub(crate) const COUNTING_SINK_INPUT_PORT: PortHandle = 90;
16

17
#[derive(Debug)]
×
18
pub(crate) struct CountingSinkFactory {
19
    expected: u64,
20
    running: Arc<AtomicBool>,
21
}
22

23
impl CountingSinkFactory {
24
    pub fn new(expected: u64, barrier: Arc<AtomicBool>) -> Self {
26✔
25
        Self {
26✔
26
            expected,
26✔
27
            running: barrier,
26✔
28
        }
26✔
29
    }
26✔
30
}
31

32
impl SinkFactory<NoneContext> for CountingSinkFactory {
33
    fn get_input_ports(&self) -> Vec<PortHandle> {
56✔
34
        vec![COUNTING_SINK_INPUT_PORT]
56✔
35
    }
56✔
36

×
37
    fn prepare(
×
38
        &self,
×
39
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
40
    ) -> Result<(), ExecutionError> {
×
41
        Ok(())
×
42
    }
×
43

44
    fn build(
26✔
45
        &self,
26✔
46
        _input_schemas: HashMap<PortHandle, Schema>,
26✔
47
    ) -> Result<Box<dyn Sink>, ExecutionError> {
26✔
48
        Ok(Box::new(CountingSink {
26✔
49
            expected: self.expected,
26✔
50
            current: 0,
26✔
51
            running: self.running.clone(),
26✔
52
        }))
26✔
53
    }
26✔
54
}
×
55

×
56
#[derive(Debug)]
×
57
pub(crate) struct CountingSink {
×
58
    expected: u64,
×
59
    current: u64,
×
60
    running: Arc<AtomicBool>,
×
61
}
62
impl Sink for CountingSink {
63
    fn init(&mut self, _state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
25✔
64
        Ok(())
25✔
65
    }
25✔
66

67
    fn commit(
1,028✔
68
        &mut self,
1,028✔
69
        _epoch_details: &Epoch,
1,028✔
70
        _tx: &SharedTransaction,
1,028✔
71
    ) -> Result<(), ExecutionError> {
1,028✔
72
        // if self.current == self.expected {
1,028✔
73
        //     info!(
1,028✔
74
        //         "Received {} messages. Notifying sender to exit!",
1,028✔
75
        //         self.current
1,028✔
76
        //     );
1,028✔
77
        //     self.running.store(false, Ordering::Relaxed);
1,028✔
78
        // }
1,028✔
79
        Ok(())
1,028✔
80
    }
1,028✔
81

×
82
    fn process(
3,276,804✔
83
        &mut self,
3,276,804✔
84
        _from_port: PortHandle,
3,276,804✔
85
        _op: Operation,
3,276,804✔
86
        _state: &SharedTransaction,
3,276,804✔
87
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
3,276,804✔
88
    ) -> Result<(), ExecutionError> {
3,276,804✔
89
        self.current += 1;
3,276,804✔
90
        if self.current == self.expected {
3,276,804✔
91
            info!(
14✔
92
                "Received {} messages. Notifying sender to exit!",
×
93
                self.current
×
94
            );
×
95
            self.running.store(false, Ordering::Relaxed);
14✔
96
        }
3,276,790✔
97
        Ok(())
3,276,804✔
98
    }
3,276,804✔
99
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc