• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4023928230

pending completion
4023928230

Pull #744

github

GitHub
Merge fe141bf50 into 70bd6e0ad
Pull Request #744: feat: Implement direct insert to cache pipeline

243 of 243 new or added lines in 16 files covered. (100.0%)

23197 of 34900 relevant lines covered (66.47%)

45617.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/simple/executor.rs
1
use dozer_api::grpc::internal_grpc::PipelineResponse;
2
use dozer_core::dag::app::{App, AppPipeline};
3
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
4
use dozer_types::indicatif::MultiProgress;
5
use dozer_types::types::{Operation, SchemaWithChangesType};
6
use std::collections::HashMap;
7
use std::path::PathBuf;
8
use std::sync::atomic::AtomicBool;
9
use std::sync::Arc;
10

11
use dozer_api::CacheEndpoint;
12
use dozer_types::models::source::Source;
13

14
use crate::pipeline::{CacheSinkFactory, CacheSinkSettings, StreamingSinkFactory};
15
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
16
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
17
use dozer_ingestion::connectors::{get_connector, get_connector_info_table, TableInfo};
18

19
use dozer_ingestion::ingestion::{IngestionIterator, Ingestor};
20

21
use dozer_types::crossbeam;
22
use dozer_types::log::{error, info};
23

24
use dozer_types::models::connection::Connection;
25
use dozer_types::parking_lot::RwLock;
26
use OrchestrationError::ExecutionError;
27

28
use crate::console_helper::get_colored_text;
29
use crate::errors::OrchestrationError;
30
use crate::pipeline::source_builder::SourceBuilder;
31
use crate::simple::direct_cache_pipeline::source_to_pipeline;
32
use crate::{validate, validate_schema};
33

34
pub struct Executor {
35
    sources: Vec<Source>,
36
    cache_endpoints: Vec<CacheEndpoint>,
37
    pipeline_dir: PathBuf,
38
    ingestor: Arc<RwLock<Ingestor>>,
39
    iterator: Arc<RwLock<IngestionIterator>>,
40
    running: Arc<AtomicBool>,
41
    progress: MultiProgress,
42
}
43
impl Executor {
×
44
    pub fn new(
×
45
        sources: Vec<Source>,
×
46
        cache_endpoints: Vec<CacheEndpoint>,
×
47
        ingestor: Arc<RwLock<Ingestor>>,
×
48
        iterator: Arc<RwLock<IngestionIterator>>,
×
49
        running: Arc<AtomicBool>,
×
50
        pipeline_dir: PathBuf,
×
51
    ) -> Self {
×
52
        Self {
×
53
            sources,
×
54
            cache_endpoints,
×
55
            pipeline_dir,
×
56
            ingestor,
×
57
            iterator,
×
58
            running,
×
59
            progress: MultiProgress::new(),
×
60
        }
×
61
    }
×
62

×
63
    pub fn get_connection_groups(&self) -> HashMap<String, Vec<Source>> {
×
64
        SourceBuilder::group_connections(self.sources.clone())
×
65
    }
×
66

×
67
    pub fn validate_grouped_connections(
×
68
        grouped_connections: &HashMap<String, Vec<Source>>,
×
69
    ) -> Result<(), OrchestrationError> {
×
70
        for sources_group in grouped_connections.values() {
×
71
            let first_source = sources_group.get(0).unwrap();
×
72

×
73
            if let Some(connection) = &first_source.connection {
×
74
                let tables: Vec<TableInfo> = sources_group
×
75
                    .iter()
×
76
                    .map(|source| TableInfo {
×
77
                        name: source.name.clone(),
×
78
                        table_name: source.table_name.clone(),
×
79
                        id: 0,
×
80
                        columns: Some(source.columns.clone()),
×
81
                    })
×
82
                    .collect();
×
83

×
84
                if let Some(info_table) = get_connector_info_table(connection) {
×
85
                    info!("[{}] Connection parameters", connection.name);
×
86
                    info_table.printstd();
×
87
                }
×
88

×
89
                validate(connection.clone(), Some(tables.clone()))
×
90
                    .map_err(|e| {
×
91
                        error!(
×
92
                            "[{}] {} Connection validation error: {}",
×
93
                            connection.name,
×
94
                            get_colored_text("X", "31"),
×
95
                            e
96
                        );
×
97
                        OrchestrationError::SourceValidationError
×
98
                    })
×
99
                    .map(|_| {
×
100
                        info!(
×
101
                            "[{}] {} Connection validation completed",
×
102
                            connection.name,
×
103
                            get_colored_text("✓", "32")
×
104
                        );
×
105
                    })?;
×
106

×
107
                validate_schema(connection.clone(), &tables).map_or_else(
×
108
                    |e| {
×
109
                        error!(
×
110
                            "[{}] {} Schema validation error: {}",
×
111
                            connection.name,
×
112
                            get_colored_text("X", "31"),
×
113
                            e
114
                        );
×
115
                        Err(OrchestrationError::SourceValidationError)
×
116
                    },
×
117
                    |r| {
×
118
                        let mut all_valid = true;
×
119
                        for (table_name, validation_result) in r.into_iter() {
×
120
                            let is_valid =
×
121
                                validation_result.iter().all(|(_, result)| result.is_ok());
×
122

×
123
                            if is_valid {
×
124
                                info!(
×
125
                                    "[{}][{}] {} Schema validation completed",
×
126
                                    connection.name,
×
127
                                    table_name,
×
128
                                    get_colored_text("✓", "32")
×
129
                                );
130
                            } else {
×
131
                                all_valid = false;
×
132
                                for (_, error) in validation_result {
×
133
                                    if let Err(e) = error {
×
134
                                        error!(
×
135
                                            "[{}][{}] {} Schema validation error: {}",
×
136
                                            connection.name,
×
137
                                            table_name,
×
138
                                            get_colored_text("X", "31"),
×
139
                                            e
140
                                        );
×
141
                                    }
×
142
                                }
143
                            }
144
                        }
145

×
146
                        if !all_valid {
×
147
                            return Err(OrchestrationError::SourceValidationError);
×
148
                        }
×
149

×
150
                        Ok(())
×
151
                    },
×
152
                )?;
×
153
            }
×
154
        }
155

×
156
        Ok(())
×
157
    }
×
158

159
    // This function is used to run a query using a temporary pipeline
×
160
    pub fn query(
×
161
        &self,
×
162
        sql: String,
×
163
        sender: crossbeam::channel::Sender<Operation>,
×
164
    ) -> Result<dozer_core::dag::dag::Dag<SchemaSQLContext>, OrchestrationError> {
×
165
        let grouped_connections = self.get_connection_groups();
×
166

×
167
        let (mut pipeline, (query_name, query_port)) =
×
168
            statement_to_pipeline(&sql).map_err(OrchestrationError::PipelineError)?;
×
169
        pipeline.add_sink(
×
170
            Arc::new(StreamingSinkFactory::new(sender)),
×
171
            "streaming_sink",
×
172
        );
×
173
        pipeline
×
174
            .connect_nodes(
×
175
                &query_name,
×
176
                Some(query_port),
×
177
                "streaming_sink",
×
178
                Some(DEFAULT_PORT_HANDLE),
×
179
            )
×
180
            .map_err(OrchestrationError::ExecutionError)?;
×
181

×
182
        let used_sources: Vec<String> = pipeline.get_entry_points_sources_names();
×
183

×
184
        let asm = SourceBuilder::build_source_manager(
×
185
            used_sources,
×
186
            grouped_connections,
×
187
            self.ingestor.clone(),
×
188
            self.iterator.clone(),
×
189
            self.running.clone(),
×
190
        )?;
×
191
        let mut app = App::new(asm);
×
192
        app.add_pipeline(pipeline);
×
193

×
194
        let dag = app.get_dag().map_err(OrchestrationError::ExecutionError)?;
×
195
        let path = &self.pipeline_dir;
×
196
        let mut exec = DagExecutor::new(
×
197
            &dag,
×
198
            path.as_path(),
×
199
            ExecutorOptions::default(),
×
200
            self.running.clone(),
×
201
        )?;
×
202

×
203
        exec.start()?;
×
204
        Ok(dag)
×
205
    }
×
206

207
    // This function is used by both migrate and actual execution
×
208
    pub fn build_pipeline(
×
209
        &self,
×
210
        notifier: Option<crossbeam::channel::Sender<PipelineResponse>>,
×
211
        api_dir: PathBuf,
×
212
        settings: CacheSinkSettings,
×
213
    ) -> Result<dozer_core::dag::dag::Dag<SchemaSQLContext>, OrchestrationError> {
×
214
        let grouped_connections = self.get_connection_groups();
×
215

×
216
        Self::validate_grouped_connections(&grouped_connections)?;
×
217

×
218
        let mut pipelines: Vec<AppPipeline<SchemaSQLContext>> = vec![];
×
219
        let mut used_sources = vec![];
×
220
        for cache_endpoint in self.cache_endpoints.iter().cloned() {
×
221
            let api_endpoint = cache_endpoint.endpoint.clone();
×
222
            let _api_endpoint_name = api_endpoint.name.clone();
×
223
            let cache = cache_endpoint.cache;
×
224

225
            // let mut pipeline = PipelineBuilder {}
226
            //     .build_pipeline(&api_endpoint.sql)
227
            //     .map_err(OrchestrationError::PipelineError)?;
228

×
229
            let (mut pipeline, (query_name, query_port)) = api_endpoint.sql.as_ref().map_or_else(
×
230
                || Ok(source_to_pipeline(&api_endpoint)),
×
231
                |sql| statement_to_pipeline(sql).map_err(OrchestrationError::PipelineError),
×
232
            )?;
×
233

×
234
            pipeline.add_sink(
×
235
                Arc::new(CacheSinkFactory::new(
×
236
                    vec![DEFAULT_PORT_HANDLE],
×
237
                    cache,
×
238
                    api_endpoint,
×
239
                    notifier.clone(),
×
240
                    api_dir.clone(),
×
241
                    self.progress.clone(),
×
242
                    settings.to_owned(),
×
243
                )),
×
244
                cache_endpoint.endpoint.name.as_str(),
×
245
            );
×
246

×
247
            pipeline
×
248
                .connect_nodes(
×
249
                    &query_name,
×
250
                    Some(query_port),
×
251
                    cache_endpoint.endpoint.name.as_str(),
×
252
                    Some(DEFAULT_PORT_HANDLE),
×
253
                )
×
254
                .map_err(ExecutionError)?;
×
255

×
256
            for name in pipeline.get_entry_points_sources_names() {
×
257
                used_sources.push(name);
×
258
            }
×
259

260
            pipelines.push(pipeline);
×
261
        }
×
262

×
263
        let asm = SourceBuilder::build_source_manager(
×
264
            used_sources,
×
265
            grouped_connections,
×
266
            self.ingestor.clone(),
×
267
            self.iterator.clone(),
×
268
            self.running.clone(),
×
269
        )?;
×
270
        let mut app = App::new(asm);
×
271

×
272
        Vec::into_iter(pipelines).for_each(|p| {
×
273
            app.add_pipeline(p);
×
274
        });
×
275

276
        let dag = app.get_dag().map_err(ExecutionError)?;
×
277

×
278
        DagExecutor::validate(&dag, &self.pipeline_dir)
×
279
            .map(|_| {
×
280
                info!("[pipeline] Validation completed");
×
281
            })
×
282
            .map_err(|e| {
×
283
                error!("[pipeline] Validation error: {}", e);
×
284
                OrchestrationError::PipelineValidationError
×
285
            })?;
×
286

×
287
        Ok(dag)
×
288
    }
×
289

×
290
    pub fn get_tables(
×
291
        connections: &Vec<Connection>,
×
292
    ) -> Result<HashMap<String, Vec<SchemaWithChangesType>>, OrchestrationError> {
×
293
        let mut schema_map = HashMap::new();
×
294
        for connection in connections {
×
295
            validate(connection.to_owned(), None)?;
×
296

×
297
            let connector = get_connector(connection.to_owned())?;
×
298
            let schema_tuples = connector.get_schemas(None)?;
×
299
            schema_map.insert(connection.name.to_owned(), schema_tuples);
×
300
        }
×
301

×
302
        Ok(schema_map)
×
303
    }
×
304

×
305
    pub fn run(
×
306
        &self,
×
307
        notifier: Option<crossbeam::channel::Sender<PipelineResponse>>,
×
308
        settings: CacheSinkSettings,
×
309
    ) -> Result<(), OrchestrationError> {
×
310
        let running_wait = self.running.clone();
×
311

×
312
        let parent_dag = self.build_pipeline(notifier, PathBuf::default(), settings)?;
×
313
        let path = &self.pipeline_dir;
×
314

×
315
        if !path.exists() {
×
316
            return Err(OrchestrationError::PipelineDirectoryNotFound(
×
317
                path.to_string_lossy().to_string(),
×
318
            ));
×
319
        }
×
320

×
321
        let mut exec = DagExecutor::new(
×
322
            &parent_dag,
×
323
            path.as_path(),
×
324
            ExecutorOptions::default(),
×
325
            running_wait,
×
326
        )?;
×
327

×
328
        exec.start()?;
×
329
        exec.join().map_err(ExecutionError)
×
330
    }
×
331
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc