• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698406303

pending completion
4698406303

Pull #1426

github

GitHub
Merge daefffe87 into b6889464a
Pull Request #1426: feat: implement python log bindings

1 of 1 new or added line in 1 file covered. (100.0%)

34863 of 45840 relevant lines covered (76.05%)

10764.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.37
/dozer-sql/src/pipeline/projection/processor.rs
1
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor};
2

3
use dozer_core::channels::ProcessorChannelForwarder;
4
use dozer_core::epoch::Epoch;
5
use dozer_core::errors::ExecutionError;
6
use dozer_core::errors::ExecutionError::InternalError;
7
use dozer_core::node::{PortHandle, Processor};
8
use dozer_core::DEFAULT_PORT_HANDLE;
9
use dozer_types::types::{Operation, Record, Schema};
10

11
#[derive(Debug)]
×
12
pub struct ProjectionProcessor {
13
    expressions: Vec<Expression>,
14
    input_schema: Schema,
15
}
16

17
impl ProjectionProcessor {
18
    pub fn new(input_schema: Schema, expressions: Vec<Expression>) -> Self {
1,466✔
19
        Self {
1,466✔
20
            input_schema,
1,466✔
21
            expressions,
1,466✔
22
        }
1,466✔
23
    }
1,466✔
24

25
    fn delete(&mut self, record: &Record) -> Result<Operation, ExecutionError> {
294✔
26
        let mut results = vec![];
294✔
27

28
        for expr in &self.expressions {
870✔
29
            results.push(
30
                expr.evaluate(record, &self.input_schema)
576✔
31
                    .map_err(|e| InternalError(Box::new(e)))?,
576✔
32
            );
33
        }
34
        Ok(Operation::Delete {
294✔
35
            old: Record::new(None, results),
294✔
36
        })
294✔
37
    }
294✔
38

39
    fn insert(&mut self, record: &Record) -> Result<Operation, ExecutionError> {
3,698✔
40
        let mut results = vec![];
3,698✔
41

42
        for expr in self.expressions.clone() {
6,734✔
43
            results.push(
44
                expr.evaluate(record, &self.input_schema)
6,734✔
45
                    .map_err(|e| InternalError(Box::new(e)))?,
6,734✔
46
            );
47
        }
48
        Ok(Operation::Insert {
3,692✔
49
            new: Record::new(None, results),
3,692✔
50
        })
3,692✔
51
    }
3,692✔
52

53
    fn update(&self, old: &Record, new: &Record) -> Result<Operation, ExecutionError> {
×
54
        let mut old_results = vec![];
×
55
        let mut new_results = vec![];
×
56

57
        for expr in &self.expressions {
×
58
            old_results.push(
59
                expr.evaluate(old, &self.input_schema)
×
60
                    .map_err(|e| InternalError(Box::new(e)))?,
×
61
            );
62
            new_results.push(
63
                expr.evaluate(new, &self.input_schema)
×
64
                    .map_err(|e| InternalError(Box::new(e)))?,
×
65
            );
66
        }
67

68
        Ok(Operation::Update {
×
69
            old: Record::new(None, old_results),
×
70
            new: Record::new(None, new_results),
×
71
        })
×
72
    }
×
73
}
74

75
impl Processor for ProjectionProcessor {
76
    fn process(
3,986✔
77
        &mut self,
3,986✔
78
        _from_port: PortHandle,
3,986✔
79
        op: Operation,
3,986✔
80
        fw: &mut dyn ProcessorChannelForwarder,
3,986✔
81
    ) -> Result<(), ExecutionError> {
3,986✔
82
        let _ = match op {
3,986✔
83
            Operation::Delete { ref old } => fw.send(self.delete(old)?, DEFAULT_PORT_HANDLE),
288✔
84
            Operation::Insert { ref new } => fw.send(self.insert(new)?, DEFAULT_PORT_HANDLE),
3,698✔
85
            Operation::Update { ref old, ref new } => {
×
86
                fw.send(self.update(old, new)?, DEFAULT_PORT_HANDLE)
×
87
            }
88
        };
89
        Ok(())
3,986✔
90
    }
3,986✔
91

92
    fn commit(&self, _epoch: &Epoch) -> Result<(), ExecutionError> {
372✔
93
        Ok(())
372✔
94
    }
372✔
95
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc