• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698125137

pending completion
4698125137

Pull #1432

github

GitHub
Merge ef21858a9 into ca6ea5f58
Pull Request #1432: chore: Add deb generation to release process

34321 of 44357 relevant lines covered (77.37%)

16844.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.59
/dozer-sql/src/pipeline/planner/projection.rs
1
#![allow(dead_code)]
2

3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::builder::ExpressionBuilder;
5
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor};
6
use dozer_types::types::{FieldDefinition, Schema};
7
use sqlparser::ast::{Expr, Ident, Select, SelectItem};
8

9
#[derive(Clone, Copy)]
×
10
pub enum PrimaryKeyAction {
11
    Retain,
12
    Drop,
13
    Force,
14
}
15

16
pub struct CommonPlanner {
17
    input_schema: Schema,
18
    pub post_aggregation_schema: Schema,
19
    pub post_projection_schema: Schema,
20
    // Vector of aggregations to be appended to the original record
21
    pub aggregation_output: Vec<Expression>,
22
    pub having: Option<Expression>,
23
    pub groupby: Vec<Expression>,
24
    pub projection_output: Vec<Expression>,
25
}
26

27
impl CommonPlanner {
28
    fn append_to_schema(
3,326✔
29
        expr: &Expression,
3,326✔
30
        alias: Option<String>,
3,326✔
31
        input_schema: &Schema,
3,326✔
32
        output_schema: &mut Schema,
3,326✔
33
    ) -> Result<(), PipelineError> {
3,326✔
34
        let expr_type = expr.get_type(input_schema)?;
3,326✔
35
        output_schema.fields.push(FieldDefinition::new(
3,326✔
36
            alias.unwrap_or_else(|| expr.to_string(input_schema)),
3,326✔
37
            expr_type.return_type,
3,326✔
38
            expr_type.nullable,
3,326✔
39
            expr_type.source,
3,326✔
40
        ));
3,326✔
41

3,326✔
42
        Ok(())
3,326✔
43
    }
3,326✔
44

45
    fn add_select_item(&mut self, item: SelectItem) -> Result<(), PipelineError> {
2,767✔
46
        let expr_items: Vec<(Expr, Option<String>)> = match item {
2,767✔
47
            SelectItem::UnnamedExpr(expr) => vec![(expr, None)],
2,477✔
48
            SelectItem::ExprWithAlias { expr, alias } => vec![(expr, Some(alias.value))],
254✔
49
            SelectItem::QualifiedWildcard(_, _) => panic!("not supported yet"),
×
50
            SelectItem::Wildcard(_) => self
36✔
51
                .input_schema
36✔
52
                .fields
36✔
53
                .iter()
36✔
54
                .map(|col| (Expr::Identifier(Ident::new(col.to_owned().name)), None))
96✔
55
                .collect(),
36✔
56
        };
57

58
        for (expr, alias) in expr_items {
5,592✔
59
            let mut builder = ExpressionBuilder::new(
2,826✔
60
                self.input_schema.fields.len() + self.aggregation_output.len(),
2,826✔
61
            );
2,826✔
62
            let projection_expression = builder.build(true, &expr, &self.input_schema)?;
2,826✔
63

64
            for new_aggr in builder.aggregations {
3,273✔
65
                Self::append_to_schema(
447✔
66
                    &new_aggr,
447✔
67
                    alias.clone(),
447✔
68
                    &self.input_schema,
447✔
69
                    &mut self.post_aggregation_schema,
447✔
70
                )?;
447✔
71
                self.aggregation_output.push(new_aggr);
447✔
72
            }
73

74
            self.projection_output.push(projection_expression.clone());
2,826✔
75
            Self::append_to_schema(
2,826✔
76
                &projection_expression,
2,826✔
77
                alias,
2,826✔
78
                &self.post_aggregation_schema,
2,826✔
79
                &mut self.post_projection_schema,
2,826✔
80
            )?;
2,826✔
81
        }
82

83
        Ok(())
2,766✔
84
    }
2,766✔
85

86
    fn add_join_item(&mut self, item: SelectItem) -> Result<(), PipelineError> {
×
87
        let expr_items: Vec<(Expr, Option<String>)> = match item {
×
88
            SelectItem::UnnamedExpr(expr) => vec![(expr, None)],
×
89
            SelectItem::ExprWithAlias { expr, alias } => vec![(expr, Some(alias.value))],
×
90
            SelectItem::QualifiedWildcard(_, _) => panic!("not supported yet"),
×
91
            SelectItem::Wildcard(_) => panic!("not supported yet"),
×
92
        };
93

94
        for (expr, alias) in expr_items {
×
95
            let mut builder = ExpressionBuilder::new(
×
96
                self.input_schema.fields.len() + self.aggregation_output.len(),
×
97
            );
×
98
            let projection_expression = builder.build(true, &expr, &self.input_schema)?;
×
99

100
            for new_aggr in builder.aggregations {
×
101
                Self::append_to_schema(
×
102
                    &new_aggr,
×
103
                    alias.clone(),
×
104
                    &self.input_schema,
×
105
                    &mut self.post_aggregation_schema,
×
106
                )?;
×
107
                self.aggregation_output.push(new_aggr);
×
108
            }
109

110
            self.projection_output.push(projection_expression.clone());
×
111
            Self::append_to_schema(
×
112
                &projection_expression,
×
113
                alias,
×
114
                &self.post_aggregation_schema,
×
115
                &mut self.post_projection_schema,
×
116
            )?;
×
117
        }
118

119
        Ok(())
×
120
    }
×
121

122
    fn add_having_item(&mut self, expr: Expr) -> Result<(), PipelineError> {
54✔
123
        let mut builder = ExpressionBuilder::from(
54✔
124
            self.input_schema.fields.len(),
54✔
125
            self.aggregation_output.clone(),
54✔
126
        );
54✔
127
        let having_expression = builder.build(true, &expr, &self.input_schema)?;
54✔
128

129
        let mut post_aggregation_schema = self.input_schema.clone();
54✔
130
        let mut aggregation_output = Vec::new();
54✔
131

132
        for new_aggr in builder.aggregations {
109✔
133
            Self::append_to_schema(
55✔
134
                &new_aggr,
55✔
135
                None,
55✔
136
                &self.input_schema,
55✔
137
                &mut post_aggregation_schema,
55✔
138
            )?;
55✔
139
            aggregation_output.push(new_aggr);
55✔
140
        }
141
        self.aggregation_output = aggregation_output;
54✔
142
        self.post_aggregation_schema = post_aggregation_schema;
54✔
143

54✔
144
        self.having = Some(having_expression);
54✔
145

54✔
146
        Ok(())
54✔
147
    }
54✔
148

149
    fn add_groupby_items(&mut self, expr_items: Vec<Expr>) -> Result<(), PipelineError> {
381✔
150
        let mut indexes = vec![];
381✔
151
        let mut set_pk = true;
381✔
152
        for expr in expr_items {
781✔
153
            let mut builder = ExpressionBuilder::new(
398✔
154
                self.input_schema.fields.len() + self.aggregation_output.len(),
398✔
155
            );
398✔
156
            let groupby_expression = builder.build(false, &expr, &self.input_schema)?;
398✔
157
            self.groupby.push(groupby_expression.clone());
398✔
158

159
            if let Some(e) = self
398✔
160
                .projection_output
398✔
161
                .iter()
398✔
162
                .enumerate()
398✔
163
                .find(|e| e.1 == &groupby_expression)
487✔
164
            {
350✔
165
                indexes.push(e.0);
350✔
166
            } else {
350✔
167
                set_pk = false
50✔
168
            }
169
        }
170

171
        if set_pk {
383✔
172
            indexes.sort();
335✔
173
            self.post_projection_schema.primary_index = indexes;
335✔
174
        }
335✔
175

176
        Ok(())
383✔
177
    }
383✔
178

179
    pub fn plan(&mut self, select: Select) -> Result<(), PipelineError> {
1,159✔
180
        for expr in select.projection {
3,920✔
181
            self.add_select_item(expr)?;
2,765✔
182
        }
183
        if !select.group_by.is_empty() {
1,155✔
184
            self.add_groupby_items(select.group_by)?;
384✔
185
        }
771✔
186

187
        if let Some(having) = select.having {
1,155✔
188
            self.add_having_item(having)?;
55✔
189
        }
1,100✔
190

191
        Ok(())
1,154✔
192
    }
1,158✔
193

194
    pub fn new(input_schema: Schema) -> Self {
1,157✔
195
        Self {
1,157✔
196
            input_schema: input_schema.clone(),
1,157✔
197
            post_aggregation_schema: input_schema,
1,157✔
198
            post_projection_schema: Schema::empty(),
1,157✔
199
            aggregation_output: Vec::new(),
1,157✔
200
            having: None,
1,157✔
201
            groupby: Vec::new(),
1,157✔
202
            projection_output: Vec::new(),
1,157✔
203
        }
1,157✔
204
    }
1,157✔
205
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc