• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4023928230

pending completion
4023928230

Pull #744

github

GitHub
Merge fe141bf50 into 70bd6e0ad
Pull Request #744: feat: Implement direct insert to cache pipeline

243 of 243 new or added lines in 16 files covered. (100.0%)

23197 of 34900 relevant lines covered (66.47%)

45617.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/simple/basic_processor_factory.rs
1
use dozer_core::dag::channels::ProcessorChannelForwarder;
2
use std::collections::HashMap;
3

4
use dozer_core::dag::epoch::Epoch;
5
use dozer_core::dag::errors::ExecutionError;
6
use dozer_core::dag::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory,
8
};
9
use dozer_core::dag::record_store::RecordReader;
10
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
11
use dozer_sql::pipeline::builder::SchemaSQLContext;
12
use dozer_types::types::Schema;
13

14
#[derive(Debug)]
×
15
pub struct BasicProcessorFactory {}
16

17
impl BasicProcessorFactory {
18
    /// Creates a new [`BasixProcessorFactory`].
19
    pub fn new() -> Self {
×
20
        Self {}
×
21
    }
×
22
}
23

24
pub(crate) const BASIC_PROCESSOR_INPUT_PORT: PortHandle = 0xffff_u16;
25
pub(crate) const BASIC_PROCESSOR_OUTPUT_PORT: PortHandle = 0xffff_u16;
26

27
impl ProcessorFactory<SchemaSQLContext> for BasicProcessorFactory {
28
    fn get_output_schema(
×
29
        &self,
×
30
        _output_port: &PortHandle,
×
31
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
32
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
×
33
        Ok(input_schemas
×
34
            .get(&BASIC_PROCESSOR_INPUT_PORT)
×
35
            .unwrap()
×
36
            .clone())
×
37
    }
×
38

39
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
40
        vec![BASIC_PROCESSOR_INPUT_PORT]
×
41
    }
×
42
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
43
        vec![OutputPortDef::new(
×
44
            BASIC_PROCESSOR_OUTPUT_PORT,
×
45
            OutputPortType::StatefulWithPrimaryKeyLookup {
×
46
                retr_old_records_for_deletes: true,
×
47
                retr_old_records_for_updates: true,
×
48
            },
×
49
        )]
×
50
    }
×
51

52
    fn prepare(
×
53
        &self,
×
54
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
55
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
56
    ) -> Result<(), ExecutionError> {
×
57
        Ok(())
×
58
    }
×
59

60
    fn build(
×
61
        &self,
×
62
        _input_schemas: HashMap<PortHandle, Schema>,
×
63
        _output_schemas: HashMap<PortHandle, Schema>,
×
64
    ) -> Result<Box<dyn Processor>, ExecutionError> {
×
65
        Ok(Box::new(BasicProcessor {}))
×
66
    }
×
67
}
68

69
#[derive(Debug)]
×
70
pub(crate) struct BasicProcessor {}
71

72
impl Processor for BasicProcessor {
73
    fn init(&mut self, _tx: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
×
74
        Ok(())
×
75
    }
×
76

77
    fn commit(
×
78
        &self,
×
79
        _epoch_details: &Epoch,
×
80
        _tx: &SharedTransaction,
×
81
    ) -> Result<(), ExecutionError> {
×
82
        Ok(())
×
83
    }
×
84

85
    fn process(
×
86
        &mut self,
×
87
        _from_port: PortHandle,
×
88
        op: dozer_types::types::Operation,
×
89
        fw: &mut dyn ProcessorChannelForwarder,
×
90
        _tx: &SharedTransaction,
×
91
        _readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
×
92
    ) -> Result<(), ExecutionError> {
×
93
        fw.send(op, BASIC_PROCESSOR_OUTPUT_PORT)
×
94
    }
×
95
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc