• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698406303

pending completion
4698406303

Pull #1426

github

GitHub
Merge daefffe87 into b6889464a
Pull Request #1426: feat: implement python log bindings

1 of 1 new or added line in 1 file covered. (100.0%)

34863 of 45840 relevant lines covered (76.05%)

10764.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.71
/dozer-orchestrator/src/pipeline/builder.rs
1
use std::collections::HashMap;
2
use std::collections::HashSet;
3
use std::sync::Arc;
4

5
use dozer_api::grpc::internal::internal_pipeline_server::PipelineEventSenders;
6
use dozer_core::app::App;
7
use dozer_core::app::AppPipeline;
8
use dozer_core::executor::DagExecutor;
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_ingestion::connectors::{get_connector, get_connector_info_table};
11
use dozer_sql::pipeline::builder::statement_to_pipeline;
12
use dozer_sql::pipeline::builder::{OutputNodeInfo, QueryContext, SchemaSQLContext};
13
use dozer_types::indicatif::MultiProgress;
14
use dozer_types::log::debug;
15
use dozer_types::models::api_endpoint::ApiEndpoint;
16
use dozer_types::models::connection::Connection;
17
use dozer_types::models::source::Source;
18
use std::hash::Hash;
19
use std::path::Path;
20
use tokio::runtime::Runtime;
21

22
use crate::pipeline::{LogSinkFactory, LogSinkSettings};
23

24
use super::source_builder::SourceBuilder;
25
use crate::errors::OrchestrationError;
26
use dozer_types::log::{error, info};
27
use OrchestrationError::ExecutionError;
28

29
pub enum OutputTableInfo {
30
    Transformed(OutputNodeInfo),
31
    Original(OriginalTableInfo),
32
}
33

34
pub struct OriginalTableInfo {
35
    pub table_name: String,
36
    pub connection_name: String,
37
}
38

39
pub struct CalculatedSources {
40
    pub original_sources: Vec<String>,
41
    pub transformed_sources: Vec<String>,
42
    pub query_context: Option<QueryContext>,
43
}
44
pub struct PipelineBuilder<'a> {
45
    connections: &'a [Connection],
46
    sources: &'a [Source],
47
    sql: Option<&'a str>,
48
    api_endpoints: &'a [ApiEndpoint],
49
    pipeline_dir: &'a Path,
50
    progress: MultiProgress,
51
}
52
impl<'a> PipelineBuilder<'a> {
53
    pub fn new(
25✔
54
        connections: &'a [Connection],
25✔
55
        sources: &'a [Source],
25✔
56
        sql: Option<&'a str>,
25✔
57
        api_endpoints: &'a [ApiEndpoint],
25✔
58
        pipeline_dir: &'a Path,
25✔
59
        progress: MultiProgress,
25✔
60
    ) -> Self {
25✔
61
        Self {
25✔
62
            connections,
25✔
63
            sources,
25✔
64
            sql,
25✔
65
            api_endpoints,
25✔
66
            pipeline_dir,
25✔
67
            progress,
25✔
68
        }
25✔
69
    }
25✔
70

71
    // Based on used_sources, map it to the connection name and create sources
72
    // For not breaking current functionality, current format is to be still supported.
73
    pub async fn get_grouped_tables(
25✔
74
        &self,
25✔
75
        original_sources: &[String],
25✔
76
    ) -> Result<HashMap<Connection, Vec<Source>>, OrchestrationError> {
25✔
77
        let mut grouped_connections: HashMap<Connection, Vec<Source>> = HashMap::new();
25✔
78

25✔
79
        let mut connector_map = HashMap::new();
25✔
80
        for connection in self.connections {
50✔
81
            let connector = get_connector(connection.clone())?;
25✔
82

83
            if let Some(info_table) = get_connector_info_table(connection) {
25✔
84
                info!("[{}] Connection parameters", connection.name);
×
85
                info_table.printstd();
8✔
86
            }
17✔
87

88
            let connector_tables = connector.list_tables().await?;
25✔
89

90
            // override source name if specified
91
            let connector_tables: Vec<Source> = connector_tables
25✔
92
                .iter()
25✔
93
                .map(|table| {
42✔
94
                    match self.sources.iter().find(|s| {
43✔
95
                        // TODO: @dario - Replace this line with the actual schema parsed from SQL
35✔
96
                        s.connection.as_ref().unwrap().name == connection.name
35✔
97
                            && s.table_name == table.name
35✔
98
                    }) {
43✔
99
                        Some(source) => source.clone(),
26✔
100
                        None => Source {
16✔
101
                            name: table.name.clone(),
16✔
102
                            table_name: table.name.clone(),
16✔
103
                            schema: table.schema.clone(),
16✔
104
                            connection: Some(connection.clone()),
16✔
105
                            ..Default::default()
16✔
106
                        },
16✔
107
                    }
108
                })
42✔
109
                .collect();
25✔
110

25✔
111
            connector_map.insert(connection.clone(), connector_tables);
25✔
112
        }
113

114
        for table_name in original_sources {
67✔
115
            let mut table_found = false;
42✔
116
            for (connection, tables) in connector_map.iter() {
42✔
117
                if let Some(source) = tables
42✔
118
                    .iter()
42✔
119
                    .find(|table| table.name == table_name.as_str())
59✔
120
                {
42✔
121
                    table_found = true;
42✔
122
                    grouped_connections
42✔
123
                        .entry(connection.clone())
42✔
124
                        .or_default()
42✔
125
                        .push(source.clone());
42✔
126
                }
42✔
127
            }
128

129
            if !table_found {
42✔
130
                error!("Table {} not found in any of the connections", table_name);
×
131
                return Err(OrchestrationError::SourceValidationError);
×
132
            }
42✔
133
        }
134

135
        Ok(grouped_connections)
25✔
136
    }
25✔
137

138
    // This function is used to figure out the sources that are used in the pipeline
139
    // based on the SQL and API Endpoints
140
    pub fn calculate_sources(&self) -> Result<CalculatedSources, OrchestrationError> {
24✔
141
        let mut original_sources = vec![];
24✔
142

24✔
143
        let mut query_ctx = None;
24✔
144
        let mut pipeline = AppPipeline::new();
24✔
145

24✔
146
        let mut transformed_sources = vec![];
24✔
147

148
        if let Some(sql) = &self.sql {
24✔
149
            let query_context = statement_to_pipeline(sql, &mut pipeline, None)
16✔
150
                .map_err(OrchestrationError::PipelineError)?;
16✔
151

152
            query_ctx = Some(query_context.clone());
16✔
153

154
            for (name, _) in query_context.output_tables_map {
32✔
155
                if transformed_sources.contains(&name) {
16✔
156
                    return Err(OrchestrationError::DuplicateTable(name));
×
157
                }
16✔
158
                transformed_sources.push(name.clone());
16✔
159
            }
160

161
            for name in query_context.used_sources {
52✔
162
                // Add all source tables to input tables
36✔
163
                original_sources.push(name);
36✔
164
            }
36✔
165
        }
8✔
166

167
        // Add Used Souces if direct from source
168
        for api_endpoint in self.api_endpoints {
44✔
169
            let table_name = &api_endpoint.table_name;
24✔
170

24✔
171
            // Don't add if the table is a result of SQL
24✔
172
            if !transformed_sources.contains(table_name) {
24✔
173
                original_sources.push(table_name.clone());
8✔
174
            }
12✔
175
        }
176
        dedup(&mut original_sources);
20✔
177
        dedup(&mut transformed_sources);
20✔
178

20✔
179
        Ok(CalculatedSources {
20✔
180
            original_sources,
20✔
181
            transformed_sources,
20✔
182
            query_context: query_ctx,
20✔
183
        })
20✔
184
    }
20✔
185

186
    // This function is used by both migrate and actual execution
187
    pub fn build(
24✔
188
        &self,
24✔
189
        runtime: Arc<Runtime>,
24✔
190
        settings: LogSinkSettings,
24✔
191
        notifier: Option<PipelineEventSenders>,
24✔
192
    ) -> Result<dozer_core::Dag<SchemaSQLContext>, OrchestrationError> {
24✔
193
        let calculated_sources = self.calculate_sources()?;
24✔
194

195
        debug!("Used Sources: {:?}", calculated_sources.original_sources);
24✔
196
        let grouped_connections =
24✔
197
            runtime.block_on(self.get_grouped_tables(&calculated_sources.original_sources))?;
24✔
198

199
        let mut pipelines: Vec<AppPipeline<SchemaSQLContext>> = vec![];
24✔
200

24✔
201
        let mut pipeline = AppPipeline::new();
24✔
202

24✔
203
        let mut available_output_tables: HashMap<String, OutputTableInfo> = HashMap::new();
24✔
204

205
        // Add all source tables to available output tables
206
        for (connection, sources) in &grouped_connections {
48✔
207
            for source in sources {
64✔
208
                available_output_tables.insert(
40✔
209
                    source.name.clone(),
40✔
210
                    OutputTableInfo::Original(OriginalTableInfo {
40✔
211
                        connection_name: connection.name.to_string(),
40✔
212
                        table_name: source.name.clone(),
40✔
213
                    }),
40✔
214
                );
40✔
215
            }
40✔
216
        }
217

218
        if let Some(sql) = &self.sql {
24✔
219
            let query_context = statement_to_pipeline(sql, &mut pipeline, None)
16✔
220
                .map_err(OrchestrationError::PipelineError)?;
16✔
221

222
            for (name, table_info) in query_context.output_tables_map {
32✔
223
                if available_output_tables.contains_key(name.as_str()) {
16✔
224
                    return Err(OrchestrationError::DuplicateTable(name));
×
225
                }
16✔
226
                available_output_tables
16✔
227
                    .insert(name.clone(), OutputTableInfo::Transformed(table_info));
16✔
228
            }
229
        }
8✔
230

231
        let source_builder =
24✔
232
            SourceBuilder::new(grouped_connections, Some(&self.progress), notifier.clone());
24✔
233

24✔
234
        let conn_ports = source_builder.get_ports();
24✔
235

236
        for api_endpoint in self.api_endpoints {
48✔
237
            let table_name = &api_endpoint.table_name;
24✔
238

239
            let table_info = available_output_tables
24✔
240
                .get(table_name)
24✔
241
                .ok_or_else(|| OrchestrationError::EndpointTableNotFound(table_name.clone()))?;
24✔
242

243
            let snk_factory = Arc::new(LogSinkFactory::new(
24✔
244
                settings.clone(),
24✔
245
                api_endpoint.clone(),
24✔
246
                self.progress.clone(),
24✔
247
                notifier.clone(),
24✔
248
            ));
24✔
249

24✔
250
            match table_info {
24✔
251
                OutputTableInfo::Transformed(table_info) => {
16✔
252
                    pipeline.add_sink(snk_factory, api_endpoint.name.as_str());
16✔
253

16✔
254
                    pipeline
16✔
255
                        .connect_nodes(
16✔
256
                            &table_info.node,
16✔
257
                            Some(table_info.port),
16✔
258
                            api_endpoint.name.as_str(),
16✔
259
                            Some(DEFAULT_PORT_HANDLE),
16✔
260
                            true,
16✔
261
                        )
16✔
262
                        .map_err(ExecutionError)?;
16✔
263
                }
264
                OutputTableInfo::Original(table_info) => {
8✔
265
                    pipeline.add_sink(snk_factory, api_endpoint.name.as_str());
8✔
266

8✔
267
                    let conn_port = conn_ports
8✔
268
                        .get(&(
8✔
269
                            table_info.connection_name.as_str(),
8✔
270
                            table_info.table_name.as_str(),
8✔
271
                        ))
8✔
272
                        .expect("port should be present based on source mapping");
8✔
273

8✔
274
                    pipeline
8✔
275
                        .connect_nodes(
8✔
276
                            &table_info.connection_name,
8✔
277
                            Some(*conn_port),
8✔
278
                            api_endpoint.name.as_str(),
8✔
279
                            Some(DEFAULT_PORT_HANDLE),
8✔
280
                            false,
8✔
281
                        )
8✔
282
                        .map_err(ExecutionError)?;
8✔
283
                }
284
            }
285
        }
286

287
        pipelines.push(pipeline);
24✔
288

289
        let asm = source_builder.build_source_manager(runtime)?;
24✔
290
        let mut app = App::new(asm);
24✔
291

24✔
292
        Vec::into_iter(pipelines).for_each(|p| {
24✔
293
            app.add_pipeline(p);
24✔
294
        });
24✔
295

296
        let dag = app.get_dag().map_err(ExecutionError)?;
24✔
297

298
        debug!("{}", dag);
24✔
299

300
        DagExecutor::validate(dag.clone(), self.pipeline_dir.to_path_buf())
24✔
301
            .map(|_| {
24✔
302
                info!("[pipeline] Validation completed");
24✔
303
            })
24✔
304
            .map_err(|e| {
24✔
305
                error!("[pipeline] Validation error: {}", e);
×
306
                OrchestrationError::PipelineValidationError
×
307
            })?;
24✔
308

309
        Ok(dag)
24✔
310
    }
24✔
311
}
312

313
fn dedup<T: Eq + Hash + Clone>(v: &mut Vec<T>) {
48✔
314
    let mut uniques = HashSet::new();
48✔
315
    v.retain(|e| uniques.insert(e.clone()));
64✔
316
}
48✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc