• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698406303

pending completion
4698406303

Pull #1426

github

GitHub
Merge daefffe87 into b6889464a
Pull Request #1426: feat: implement python log bindings

1 of 1 new or added line in 1 file covered. (100.0%)

34863 of 45840 relevant lines covered (76.05%)

10764.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.2
/dozer-sql/src/pipeline/projection/factory.rs
1
use std::collections::HashMap;
2

3
use dozer_core::{
4
    errors::ExecutionError,
5
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
6
    DEFAULT_PORT_HANDLE,
7
};
8
use dozer_types::types::{FieldDefinition, Schema};
9
use sqlparser::ast::{Expr, Ident, SelectItem};
10

11
use crate::pipeline::builder::SchemaSQLContext;
12
use crate::pipeline::{
13
    errors::PipelineError,
14
    expression::{
15
        builder::ExpressionBuilder, execution::Expression, execution::ExpressionExecutor,
16
    },
17
};
18

19
use super::processor::ProjectionProcessor;
20

21
#[derive(Debug)]
×
22
pub struct ProjectionProcessorFactory {
23
    select: Vec<SelectItem>,
24
}
25

26
impl ProjectionProcessorFactory {
27
    /// Creates a new [`ProjectionProcessorFactory`].
28
    pub fn _new(select: Vec<SelectItem>) -> Self {
1,121✔
29
        Self { select }
1,121✔
30
    }
1,121✔
31
}
32

33
impl ProcessorFactory<SchemaSQLContext> for ProjectionProcessorFactory {
34
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
35
        vec![DEFAULT_PORT_HANDLE]
×
36
    }
×
37

38
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
39
        vec![OutputPortDef::new(
×
40
            DEFAULT_PORT_HANDLE,
×
41
            OutputPortType::Stateless,
×
42
        )]
×
43
    }
×
44

45
    fn get_output_schema(
1,118✔
46
        &self,
1,118✔
47
        _output_port: &PortHandle,
1,118✔
48
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
1,118✔
49
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
1,118✔
50
        let (input_schema, context) = input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap();
1,118✔
51

1,118✔
52
        let mut select_expr: Vec<(String, Expression)> = vec![];
1,118✔
53
        for s in self.select.iter() {
1,121✔
54
            match s {
1,121✔
55
                SelectItem::Wildcard(_) => {
56
                    let fields: Vec<SelectItem> = input_schema
×
57
                        .fields
×
58
                        .iter()
×
59
                        .map(|col| {
2✔
60
                            SelectItem::UnnamedExpr(Expr::Identifier(Ident::new(
2✔
61
                                col.to_owned().name,
2✔
62
                            )))
2✔
63
                        })
2✔
64
                        .collect();
×
65
                    for f in fields {
2✔
66
                        let res = parse_sql_select_item(&f, input_schema);
1✔
67
                        if let Ok(..) = res {
1✔
68
                            select_expr.push(res.unwrap())
2✔
69
                        }
×
70
                    }
71
                }
72
                _ => {
73
                    let res = parse_sql_select_item(s, input_schema);
1,121✔
74
                    if let Ok(..) = res {
1,121✔
75
                        select_expr.push(res.unwrap())
1,121✔
76
                    }
×
77
                }
78
            }
79
        }
80

81
        let mut output_schema = input_schema.clone();
1,120✔
82
        let mut fields = vec![];
1,120✔
83
        for e in select_expr.iter() {
1,123✔
84
            let field_name = e.0.clone();
1,123✔
85
            let field_type =
1,122✔
86
                e.1.get_type(input_schema)
1,123✔
87
                    .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
1,123✔
88
            fields.push(FieldDefinition::new(
1,122✔
89
                field_name,
1,122✔
90
                field_type.return_type,
1,122✔
91
                field_type.nullable,
1,122✔
92
                field_type.source,
1,122✔
93
            ));
1,122✔
94
        }
95
        output_schema.fields = fields;
1,120✔
96

1,120✔
97
        Ok((output_schema, context.clone()))
1,120✔
98
    }
1,121✔
99

100
    fn build(
1,118✔
101
        &self,
1,118✔
102
        input_schemas: HashMap<PortHandle, Schema>,
1,118✔
103
        _output_schemas: HashMap<PortHandle, Schema>,
1,118✔
104
    ) -> Result<Box<dyn Processor>, ExecutionError> {
1,118✔
105
        let schema = match input_schemas.get(&DEFAULT_PORT_HANDLE) {
1,118✔
106
            Some(schema) => Ok(schema),
1,118✔
107
            None => Err(ExecutionError::InternalStringError(
×
108
                "Invalid Projection input port".to_string(),
×
109
            )),
×
110
        }?;
×
111

112
        match self
1,118✔
113
            .select
1,118✔
114
            .iter()
1,118✔
115
            .map(|item| parse_sql_select_item(item, schema))
1,118✔
116
            .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
1,118✔
117
        {
118
            Ok(expressions) => Ok(Box::new(ProjectionProcessor::new(
1,118✔
119
                schema.clone(),
1,118✔
120
                expressions.into_iter().map(|e| e.1).collect(),
1,118✔
121
            ))),
1,118✔
122
            Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
123
        }
124
    }
1,118✔
125
}
126

127
pub(crate) fn parse_sql_select_item(
2,237✔
128
    sql: &SelectItem,
2,237✔
129
    schema: &Schema,
2,237✔
130
) -> Result<(String, Expression), PipelineError> {
2,237✔
131
    match sql {
2,237✔
132
        SelectItem::UnnamedExpr(sql_expr) => {
2,235✔
133
            match ExpressionBuilder::new(0).parse_sql_expression(true, sql_expr, schema) {
2,235✔
134
                Ok(expr) => Ok((sql_expr.to_string(), expr)),
2,239✔
135
                Err(error) => Err(error),
×
136
            }
137
        }
138
        SelectItem::ExprWithAlias { expr, alias } => {
2✔
139
            match ExpressionBuilder::new(0).parse_sql_expression(true, expr, schema) {
2✔
140
                Ok(expr) => Ok((alias.value.clone(), expr)),
2✔
141
                Err(error) => Err(error),
×
142
            }
143
        }
144
        SelectItem::Wildcard(_) => Err(PipelineError::InvalidOperator("*".to_string())),
×
145
        SelectItem::QualifiedWildcard(ref object_name, ..) => {
×
146
            Err(PipelineError::InvalidOperator(object_name.to_string()))
×
147
        }
148
    }
149
}
2,241✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc