• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075746792

pending completion
4075746792

Pull #790

github

GitHub
Merge 75b5f293b into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

393 of 393 new or added lines in 12 files covered. (100.0%)

24721 of 36724 relevant lines covered (67.32%)

55788.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/pipeline/streaming_sink.rs
1
use dozer_core::{
2
    dag::{
3
        dag::DEFAULT_PORT_HANDLE,
4
        epoch::Epoch,
5
        errors::ExecutionError,
6
        node::{PortHandle, Sink, SinkFactory},
7
        record_store::RecordReader,
8
    },
9
    storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction},
10
};
11
use dozer_sql::pipeline::builder::SchemaSQLContext;
12
use dozer_types::{
13
    crossbeam,
14
    log::debug,
15
    types::{Operation, Schema},
16
};
17
use std::collections::HashMap;
18

19
#[derive(Debug)]
×
20
pub(crate) struct StreamingSinkFactory {
21
    sender: crossbeam::channel::Sender<Operation>,
22
}
23

24
impl StreamingSinkFactory {
25
    pub fn new(sender: crossbeam::channel::Sender<Operation>) -> Self {
×
26
        Self { sender }
×
27
    }
×
28
}
29

30
impl SinkFactory<SchemaSQLContext> for StreamingSinkFactory {
31
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
32
        vec![DEFAULT_PORT_HANDLE]
×
33
    }
×
34

35
    fn build(
×
36
        &self,
×
37
        _input_schemas: HashMap<PortHandle, Schema>,
×
38
    ) -> Result<Box<dyn Sink>, ExecutionError> {
×
39
        Ok(Box::new(StreamingSink {
×
40
            current: 0,
×
41
            sender: self.sender.clone(),
×
42
        }))
×
43
    }
×
44

×
45
    fn prepare(
×
46
        &self,
×
47
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
48
    ) -> Result<(), ExecutionError> {
×
49
        Ok(())
×
50
    }
×
51
}
52

×
53
#[derive(Debug)]
×
54
pub struct StreamingSink {
×
55
    current: u64,
×
56
    sender: crossbeam::channel::Sender<Operation>,
×
57
}
×
58

59
impl Sink for StreamingSink {
60
    fn init(&mut self, _env: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
×
61
        debug!("SINK: Initialising StreamingSink");
×
62
        Ok(())
×
63
    }
×
64

65
    fn process(
×
66
        &mut self,
×
67
        _from_port: PortHandle,
×
68
        op: Operation,
×
69
        _state: &SharedTransaction,
×
70
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
×
71
    ) -> Result<(), ExecutionError> {
×
72
        self.current += 1;
×
73
        let _res = self
×
74
            .sender
×
75
            .try_send(op)
×
76
            .map_err(|e| ExecutionError::InternalError(Box::new(e)));
×
77

×
78
        Ok(())
×
79
    }
×
80

×
81
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
×
82
        Ok(())
×
83
    }
×
84
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc