• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4829321272

pending completion
4829321272

Pull #1515

github

GitHub
Merge b6d982211 into f2ab0e6ce
Pull Request #1515: feat: Run migration only when necessary

193 of 193 new or added lines in 11 files covered. (100.0%)

35565 of 45252 relevant lines covered (78.59%)

16462.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.59
/dozer-sql/src/pipeline/planner/projection.rs
1
#![allow(dead_code)]
2

3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::builder::ExpressionBuilder;
5
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor};
6
use dozer_types::types::{FieldDefinition, Schema};
7
use sqlparser::ast::{Expr, Ident, Select, SelectItem};
8

9
#[derive(Clone, Copy)]
×
10
pub enum PrimaryKeyAction {
11
    Retain,
12
    Drop,
13
    Force,
14
}
15

16
pub struct CommonPlanner {
17
    input_schema: Schema,
18
    pub post_aggregation_schema: Schema,
19
    pub post_projection_schema: Schema,
20
    // Vector of aggregations to be appended to the original record
21
    pub aggregation_output: Vec<Expression>,
22
    pub having: Option<Expression>,
23
    pub groupby: Vec<Expression>,
24
    pub projection_output: Vec<Expression>,
25
}
26

27
impl CommonPlanner {
28
    fn append_to_schema(
2,291✔
29
        expr: &Expression,
2,291✔
30
        alias: Option<String>,
2,291✔
31
        input_schema: &Schema,
2,291✔
32
        output_schema: &mut Schema,
2,291✔
33
    ) -> Result<(), PipelineError> {
2,291✔
34
        let expr_type = expr.get_type(input_schema)?;
2,291✔
35
        output_schema.fields.push(FieldDefinition::new(
2,291✔
36
            alias.unwrap_or_else(|| expr.to_string(input_schema)),
2,291✔
37
            expr_type.return_type,
2,291✔
38
            expr_type.nullable,
2,291✔
39
            expr_type.source,
2,291✔
40
        ));
2,291✔
41

2,291✔
42
        Ok(())
2,291✔
43
    }
2,291✔
44

45
    fn add_select_item(&mut self, item: SelectItem) -> Result<(), PipelineError> {
1,891✔
46
        let expr_items: Vec<(Expr, Option<String>)> = match item {
1,891✔
47
            SelectItem::UnnamedExpr(expr) => vec![(expr, None)],
1,697✔
48
            SelectItem::ExprWithAlias { expr, alias } => vec![(expr, Some(alias.value))],
170✔
49
            SelectItem::QualifiedWildcard(_, _) => panic!("not supported yet"),
×
50
            SelectItem::Wildcard(_) => self
24✔
51
                .input_schema
24✔
52
                .fields
24✔
53
                .iter()
24✔
54
                .map(|col| (Expr::Identifier(Ident::new(col.to_owned().name)), None))
64✔
55
                .collect(),
24✔
56
        };
57

58
        for (expr, alias) in expr_items {
3,815✔
59
            let mut builder = ExpressionBuilder::new(
1,929✔
60
                self.input_schema.fields.len() + self.aggregation_output.len(),
1,929✔
61
            );
1,929✔
62
            let projection_expression = builder.build(true, &expr, &self.input_schema)?;
1,929✔
63

64
            for new_aggr in builder.aggregations {
2,248✔
65
                Self::append_to_schema(
323✔
66
                    &new_aggr,
323✔
67
                    alias.clone(),
323✔
68
                    &self.input_schema,
323✔
69
                    &mut self.post_aggregation_schema,
323✔
70
                )?;
323✔
71
                self.aggregation_output.push(new_aggr);
319✔
72
            }
73

74
            self.projection_output.push(projection_expression.clone());
1,925✔
75
            Self::append_to_schema(
1,925✔
76
                &projection_expression,
1,925✔
77
                alias,
1,925✔
78
                &self.post_aggregation_schema,
1,925✔
79
                &mut self.post_projection_schema,
1,925✔
80
            )?;
1,925✔
81
        }
82

83
        Ok(())
1,886✔
84
    }
1,886✔
85

86
    fn add_join_item(&mut self, item: SelectItem) -> Result<(), PipelineError> {
×
87
        let expr_items: Vec<(Expr, Option<String>)> = match item {
×
88
            SelectItem::UnnamedExpr(expr) => vec![(expr, None)],
×
89
            SelectItem::ExprWithAlias { expr, alias } => vec![(expr, Some(alias.value))],
×
90
            SelectItem::QualifiedWildcard(_, _) => panic!("not supported yet"),
×
91
            SelectItem::Wildcard(_) => panic!("not supported yet"),
×
92
        };
93

94
        for (expr, alias) in expr_items {
×
95
            let mut builder = ExpressionBuilder::new(
×
96
                self.input_schema.fields.len() + self.aggregation_output.len(),
×
97
            );
×
98
            let projection_expression = builder.build(true, &expr, &self.input_schema)?;
×
99

100
            for new_aggr in builder.aggregations {
×
101
                Self::append_to_schema(
×
102
                    &new_aggr,
×
103
                    alias.clone(),
×
104
                    &self.input_schema,
×
105
                    &mut self.post_aggregation_schema,
×
106
                )?;
×
107
                self.aggregation_output.push(new_aggr);
×
108
            }
109

110
            self.projection_output.push(projection_expression.clone());
×
111
            Self::append_to_schema(
×
112
                &projection_expression,
×
113
                alias,
×
114
                &self.post_aggregation_schema,
×
115
                &mut self.post_projection_schema,
×
116
            )?;
×
117
        }
118

119
        Ok(())
×
120
    }
×
121

122
    fn add_having_item(&mut self, expr: Expr) -> Result<(), PipelineError> {
38✔
123
        let mut builder = ExpressionBuilder::from(
38✔
124
            self.input_schema.fields.len(),
38✔
125
            self.aggregation_output.clone(),
38✔
126
        );
38✔
127
        let having_expression = builder.build(true, &expr, &self.input_schema)?;
38✔
128

129
        let mut post_aggregation_schema = self.input_schema.clone();
38✔
130
        let mut aggregation_output = Vec::new();
38✔
131

132
        for new_aggr in builder.aggregations {
77✔
133
            Self::append_to_schema(
39✔
134
                &new_aggr,
39✔
135
                None,
39✔
136
                &self.input_schema,
39✔
137
                &mut post_aggregation_schema,
39✔
138
            )?;
39✔
139
            aggregation_output.push(new_aggr);
39✔
140
        }
141
        self.aggregation_output = aggregation_output;
38✔
142
        self.post_aggregation_schema = post_aggregation_schema;
38✔
143

38✔
144
        self.having = Some(having_expression);
38✔
145

38✔
146
        Ok(())
38✔
147
    }
38✔
148

149
    fn add_groupby_items(&mut self, expr_items: Vec<Expr>) -> Result<(), PipelineError> {
274✔
150
        let mut indexes = vec![];
274✔
151
        let mut set_pk = true;
274✔
152
        for expr in expr_items {
561✔
153
            let mut builder = ExpressionBuilder::new(
284✔
154
                self.input_schema.fields.len() + self.aggregation_output.len(),
284✔
155
            );
284✔
156
            let groupby_expression = builder.build(false, &expr, &self.input_schema)?;
284✔
157
            self.groupby.push(groupby_expression.clone());
284✔
158

159
            if let Some(e) = self
285✔
160
                .projection_output
284✔
161
                .iter()
284✔
162
                .enumerate()
284✔
163
                .find(|e| e.1 == &groupby_expression)
351✔
164
            {
253✔
165
                indexes.push(e.0);
253✔
166
            } else {
253✔
167
                set_pk = false
34✔
168
            }
169
        }
170

171
        if set_pk {
277✔
172
            indexes.sort();
243✔
173
            self.post_projection_schema.primary_index = indexes;
243✔
174
        }
243✔
175

176
        Ok(())
277✔
177
    }
277✔
178

179
    pub fn plan(&mut self, select: Select) -> Result<(), PipelineError> {
795✔
180
        for expr in select.projection {
2,684✔
181
            self.add_select_item(expr)?;
1,893✔
182
        }
183
        if !select.group_by.is_empty() {
791✔
184
            self.add_groupby_items(select.group_by)?;
278✔
185
        }
513✔
186

187
        if let Some(having) = select.having {
791✔
188
            self.add_having_item(having)?;
37✔
189
        }
754✔
190

191
        Ok(())
792✔
192
    }
796✔
193

194
    pub fn new(input_schema: Schema) -> Self {
793✔
195
        Self {
793✔
196
            input_schema: input_schema.clone(),
793✔
197
            post_aggregation_schema: input_schema,
793✔
198
            post_projection_schema: Schema::empty(),
793✔
199
            aggregation_output: Vec::new(),
793✔
200
            having: None,
793✔
201
            groupby: Vec::new(),
793✔
202
            projection_output: Vec::new(),
793✔
203
        }
793✔
204
    }
793✔
205
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc