• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4829268814

pending completion
4829268814

Pull #1516

github

GitHub
Merge 845b68ec1 into f2ab0e6ce
Pull Request #1516: Prepare v0.1.19

35090 of 44737 relevant lines covered (78.44%)

11496.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.62
/dozer-orchestrator/src/simple/orchestrator.rs
1
use super::executor::Executor;
2
use crate::console_helper::get_colored_text;
3
use crate::errors::OrchestrationError;
4
use crate::pipeline::{LogSinkSettings, PipelineBuilder};
5
use crate::shutdown::ShutdownReceiver;
6
use crate::simple::helper::validate_config;
7
use crate::utils::{
8
    get_api_security_config, get_cache_manager_options, get_executor_options,
9
    get_file_buffer_capacity, get_grpc_config, get_rest_config,
10
};
11

12
use crate::{flatten_join_handle, Orchestrator};
13
use dozer_api::auth::{Access, Authorizer};
14
use dozer_api::generator::protoc::generator::ProtoGenerator;
15
use dozer_api::{grpc, rest, CacheEndpoint};
16
use dozer_cache::cache::LmdbRwCacheManager;
17
use dozer_cache::dozer_log::home_dir::HomeDir;
18
use dozer_cache::dozer_log::schemas::write_schema;
19
use dozer_core::app::AppPipeline;
20
use dozer_core::dag_schemas::DagSchemas;
21

22
use dozer_api::grpc::internal::internal_pipeline_server::start_internal_pipeline_server;
23
use dozer_core::errors::ExecutionError;
24
use dozer_ingestion::connectors::{SourceSchema, TableInfo};
25
use dozer_sql::pipeline::builder::statement_to_pipeline;
26
use dozer_sql::pipeline::errors::PipelineError;
27
use dozer_types::crossbeam::channel::{self, Sender};
28
use dozer_types::indicatif::MultiProgress;
29
use dozer_types::log::{info, warn};
30
use dozer_types::models::app_config::Config;
31
use dozer_types::tracing::error;
32
use futures::stream::FuturesUnordered;
33
use futures::StreamExt;
34
use std::collections::HashMap;
35
use std::fs;
36
use std::path::PathBuf;
37
use std::sync::Arc;
38
use std::thread;
39
use tokio::runtime::Runtime;
40
use tokio::sync::broadcast;
41

42
#[derive(Clone)]
12✔
43
pub struct SimpleOrchestrator {
44
    pub config: Config,
45
    pub runtime: Arc<Runtime>,
46
    pub multi_pb: MultiProgress,
47
}
48

×
49
impl SimpleOrchestrator {
50
    pub fn new(config: Config, runtime: Arc<Runtime>) -> Self {
6✔
51
        Self {
6✔
52
            config,
6✔
53
            runtime,
6✔
54
            multi_pb: MultiProgress::new(),
6✔
55
        }
6✔
56
    }
6✔
57
}
×
58

×
59
impl Orchestrator for SimpleOrchestrator {
×
60
    fn run_api(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError> {
×
61
        self.runtime.block_on(async {
6✔
62
            let mut futures = FuturesUnordered::new();
6✔
63

6✔
64
            // Open `RoCacheEndpoint`s. Streaming operations if necessary.
6✔
65
            let flags = self.config.flags.clone().unwrap_or_default();
6✔
66
            let (operations_sender, operations_receiver) = if flags.dynamic {
6✔
67
                let (sender, receiver) = broadcast::channel(16);
6✔
68
                (Some(sender), Some(receiver))
6✔
69
            } else {
×
70
                (None, None)
×
71
            };
×
72

×
73
            let cache_manager = Arc::new(
6✔
74
                LmdbRwCacheManager::new(get_cache_manager_options(&self.config))
6✔
75
                    .map_err(OrchestrationError::RoCacheInitFailed)?,
6✔
76
            );
×
77
            let home_dir = HomeDir::new(
6✔
78
                self.config.home_dir.as_ref(),
6✔
79
                self.config.cache_dir.clone().into(),
6✔
80
            );
6✔
81
            let mut cache_endpoints = vec![];
6✔
82
            for endpoint in &self.config.endpoints {
12✔
83
                let (cache_endpoint, task) = CacheEndpoint::new(
6✔
84
                    &home_dir,
6✔
85
                    &*cache_manager,
6✔
86
                    endpoint.clone(),
6✔
87
                    self.runtime.clone(),
6✔
88
                    Box::pin(shutdown.create_shutdown_future()),
6✔
89
                    operations_sender.clone(),
6✔
90
                    Some(self.multi_pb.clone()),
6✔
91
                )
6✔
92
                .await?;
12✔
93
                if let Some(task) = task {
6✔
94
                    futures.push(flatten_join_handle(tokio::task::spawn_blocking(
6✔
95
                        move || task().map_err(OrchestrationError::CacheBuildFailed),
6✔
96
                    )));
6✔
97
                }
6✔
98
                cache_endpoints.push(Arc::new(cache_endpoint));
6✔
99
            }
×
100

×
101
            // Initialize API Server
×
102
            let rest_config = get_rest_config(self.config.to_owned());
6✔
103
            let rest_handle = if rest_config.enabled {
6✔
104
                let security = get_api_security_config(self.config.to_owned());
6✔
105
                let cache_endpoints_for_rest = cache_endpoints.clone();
6✔
106
                let shutdown_for_rest = shutdown.create_shutdown_future();
6✔
107
                tokio::spawn(async move {
6✔
108
                    let api_server = rest::ApiServer::new(rest_config, security);
6✔
109
                    api_server
6✔
110
                        .run(cache_endpoints_for_rest, shutdown_for_rest)
6✔
111
                        .await
18✔
112
                        .map_err(OrchestrationError::ApiServerFailed)
6✔
113
                })
6✔
114
            } else {
×
115
                tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
×
116
            };
×
117

×
118
            // Initialize gRPC Server
×
119
            let grpc_config = get_grpc_config(self.config.to_owned());
6✔
120
            let grpc_handle = if grpc_config.enabled {
6✔
121
                let api_security = get_api_security_config(self.config.to_owned());
6✔
122
                let grpc_server = grpc::ApiServer::new(grpc_config, api_security, flags);
6✔
123
                let shutdown = shutdown.create_shutdown_future();
6✔
124
                tokio::spawn(async move {
6✔
125
                    grpc_server
6✔
126
                        .run(cache_endpoints, shutdown, operations_receiver)
6✔
127
                        .await
12✔
128
                        .map_err(OrchestrationError::GrpcServerFailed)
6✔
129
                })
6✔
130
            } else {
×
131
                tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
×
132
            };
×
133

×
134
            futures.push(flatten_join_handle(rest_handle));
6✔
135
            futures.push(flatten_join_handle(grpc_handle));
6✔
136

137
            while let Some(result) = futures.next().await {
24✔
138
                result?;
18✔
139
            }
140

×
141
            Ok::<(), OrchestrationError>(())
6✔
142
        })?;
6✔
143

×
144
        Ok(())
6✔
145
    }
6✔
146

147
    fn run_apps(
6✔
148
        &mut self,
6✔
149
        shutdown: ShutdownReceiver,
6✔
150
        api_notifier: Option<Sender<bool>>,
6✔
151
    ) -> Result<(), OrchestrationError> {
6✔
152
        // gRPC notifier channel
6✔
153
        let (alias_redirected_sender, alias_redirected_receiver) = channel::unbounded();
6✔
154
        let (operation_sender, operation_receiver) = channel::unbounded();
6✔
155
        let (status_update_sender, status_update_receiver) = channel::unbounded();
6✔
156
        let internal_app_config = self.config.clone();
6✔
157
        let _intern_pipeline_thread = self.runtime.spawn(async move {
6✔
158
            let result = start_internal_pipeline_server(
6✔
159
                internal_app_config,
6✔
160
                (
6✔
161
                    alias_redirected_receiver,
6✔
162
                    operation_receiver,
6✔
163
                    status_update_receiver,
6✔
164
                ),
6✔
165
            )
6✔
166
            .await;
×
167

×
168
            if let Err(e) = result {
×
169
                std::panic::panic_any(OrchestrationError::InternalServerFailed(e));
×
170
            }
×
171

×
172
            warn!("Shutting down internal pipeline server");
×
173
        });
6✔
174

6✔
175
        let home_dir = HomeDir::new(
6✔
176
            self.config.home_dir.as_ref(),
6✔
177
            self.config.cache_dir.clone().into(),
6✔
178
        );
6✔
179
        let executor = Executor::new(
6✔
180
            &home_dir,
6✔
181
            &self.config.connections,
6✔
182
            &self.config.sources,
6✔
183
            self.config.sql.as_deref(),
6✔
184
            &self.config.endpoints,
6✔
185
            shutdown.get_running_flag(),
6✔
186
            self.multi_pb.clone(),
6✔
187
        )?;
6✔
188
        let settings = LogSinkSettings {
6✔
189
            file_buffer_capacity: get_file_buffer_capacity(&self.config),
6✔
190
        };
6✔
191
        let dag_executor = executor.create_dag_executor(
6✔
192
            self.runtime.clone(),
6✔
193
            settings,
6✔
194
            get_executor_options(&self.config),
6✔
195
            Some((
6✔
196
                alias_redirected_sender,
6✔
197
                operation_sender,
6✔
198
                status_update_sender,
6✔
199
            )),
6✔
200
        )?;
6✔
201

×
202
        if let Some(api_notifier) = api_notifier {
6✔
203
            api_notifier
6✔
204
                .send(true)
6✔
205
                .expect("Failed to notify API server");
6✔
206
        }
6✔
207

208
        executor.run_dag_executor(dag_executor)
6✔
209
    }
6✔
210

×
211
    fn list_connectors(
×
212
        &self,
×
213
    ) -> Result<HashMap<String, (Vec<TableInfo>, Vec<SourceSchema>)>, OrchestrationError> {
×
214
        self.runtime
×
215
            .block_on(Executor::get_tables(&self.config.connections))
×
216
    }
×
217

×
218
    fn generate_token(&self) -> Result<String, OrchestrationError> {
×
219
        if let Some(api_config) = self.config.api.to_owned() {
×
220
            if let Some(api_security) = api_config.api_security {
×
221
                match api_security {
×
222
                    dozer_types::models::api_security::ApiSecurity::Jwt(secret) => {
×
223
                        let auth = Authorizer::new(&secret, None, None);
×
224
                        let token = auth.generate_token(Access::All, None).map_err(|err| {
×
225
                            OrchestrationError::GenerateTokenFailed(err.to_string())
×
226
                        })?;
×
227
                        return Ok(token);
×
228
                    }
×
229
                }
×
230
            }
×
231
        }
×
232
        Err(OrchestrationError::GenerateTokenFailed(
×
233
            "Missing api config or security input".to_owned(),
×
234
        ))
×
235
    }
×
236

×
237
    fn migrate(&mut self, force: bool) -> Result<(), OrchestrationError> {
6✔
238
        let home_dir = HomeDir::new(
6✔
239
            self.config.home_dir.as_ref(),
6✔
240
            self.config.cache_dir.clone().into(),
6✔
241
        );
6✔
242

6✔
243
        info!(
6✔
244
            "Initiating app: {}",
×
245
            get_colored_text(&self.config.app_name, "35")
×
246
        );
×
247
        if force {
6✔
248
            self.clean()?;
×
249
        }
6✔
250
        validate_config(&self.config)?;
6✔
251

×
252
        // Always create new migration for now.
253
        let mut endpoint_and_migration_paths = vec![];
6✔
254
        for endpoint in &self.config.endpoints {
12✔
255
            let migration_path =
6✔
256
                home_dir
6✔
257
                    .create_new_migration(&endpoint.name)
6✔
258
                    .map_err(|(path, error)| {
6✔
259
                        OrchestrationError::FailedToCreateMigration(path, error)
×
260
                    })?;
6✔
261
            endpoint_and_migration_paths.push((endpoint, migration_path));
6✔
262
        }
×
263

×
264
        // Calculate schemas.
×
265
        let endpoint_and_log_paths = endpoint_and_migration_paths
6✔
266
            .iter()
6✔
267
            .map(|(endpoint, migration_path)| {
6✔
268
                ((*endpoint).clone(), migration_path.log_path.clone())
4✔
269
            })
6✔
270
            .collect();
6✔
271
        let builder = PipelineBuilder::new(
6✔
272
            &self.config.connections,
6✔
273
            &self.config.sources,
6✔
274
            self.config.sql.as_deref(),
6✔
275
            endpoint_and_log_paths,
6✔
276
            self.multi_pb.clone(),
6✔
277
        );
6✔
278
        let settings = LogSinkSettings {
6✔
279
            file_buffer_capacity: get_file_buffer_capacity(&self.config),
6✔
280
        };
6✔
281
        let dag = builder.build(self.runtime.clone(), settings, None)?;
6✔
282
        // Populate schemas.
×
283
        let dag_schemas = DagSchemas::new(dag)?;
6✔
284

×
285
        // Write schemas to pipeline_dir and generate proto files.
×
286
        let schemas = dag_schemas.get_sink_schemas();
6✔
287
        let api_config = self.config.api.clone().unwrap_or_default();
6✔
288
        for (endpoint_name, schema) in &schemas {
12✔
289
            let (endpoint, migration_path) = endpoint_and_migration_paths
6✔
290
                .iter()
6✔
291
                .find(|e| e.0.name == *endpoint_name)
6✔
292
                .expect("Sink name must be the same as endpoint name");
6✔
293

×
294
            let schema = write_schema(schema, endpoint, &migration_path.schema_path)
6✔
295
                .map_err(OrchestrationError::FailedToWriteSchema)?;
6✔
296

×
297
            let proto_folder_path = &migration_path.api_dir;
6✔
298
            ProtoGenerator::generate(
6✔
299
                proto_folder_path,
6✔
300
                endpoint_name,
6✔
301
                &schema,
6✔
302
                &api_config.api_security,
6✔
303
                &self.config.flags,
6✔
304
            )?;
6✔
305

×
306
            let mut resources = Vec::new();
6✔
307
            resources.push(endpoint_name);
6✔
308

×
309
            let common_resources = ProtoGenerator::copy_common(proto_folder_path)
6✔
310
                .map_err(|e| OrchestrationError::InternalError(Box::new(e)))?;
6✔
311

312
            // Copy common service to be included in descriptor.
×
313
            resources.extend(common_resources.iter());
6✔
314

6✔
315
            // Generate a descriptor based on all proto files generated within sink.
6✔
316
            ProtoGenerator::generate_descriptor(
6✔
317
                proto_folder_path,
6✔
318
                &migration_path.descriptor_path,
6✔
319
                &resources,
6✔
320
            )
6✔
321
            .map_err(|e| OrchestrationError::InternalError(Box::new(e)))?;
6✔
322
        }
×
323

×
324
        Ok(())
6✔
325
    }
6✔
326

×
327
    // Cleaning the entire folder as there will be inconsistencies
×
328
    // between pipeline, cache and generated proto files.
329
    fn clean(&mut self) -> Result<(), OrchestrationError> {
×
330
        let cache_dir = PathBuf::from(self.config.cache_dir.clone());
×
331
        if cache_dir.exists() {
×
332
            fs::remove_dir_all(&cache_dir)
×
333
                .map_err(|e| ExecutionError::FileSystemError(cache_dir, e))?;
×
334
        };
×
335

×
336
        let home_dir = PathBuf::from(self.config.home_dir.clone());
×
337
        if home_dir.exists() {
×
338
            fs::remove_dir_all(&home_dir)
×
339
                .map_err(|e| ExecutionError::FileSystemError(home_dir, e))?;
×
340
        };
×
341

342
        Ok(())
×
343
    }
×
344

×
345
    fn run_all(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError> {
6✔
346
        let shutdown_api = shutdown.clone();
6✔
347

6✔
348
        let mut dozer_api = self.clone();
6✔
349

6✔
350
        let (tx, rx) = channel::unbounded::<bool>();
6✔
351

6✔
352
        self.migrate(false)?;
6✔
353

×
354
        let mut dozer_pipeline = self.clone();
6✔
355
        let pipeline_thread = thread::spawn(move || dozer_pipeline.run_apps(shutdown, Some(tx)));
6✔
356

6✔
357
        // Wait for pipeline to initialize caches before starting api server
6✔
358
        rx.recv().unwrap();
6✔
359

6✔
360
        dozer_api.run_api(shutdown_api)?;
6✔
361

×
362
        // wait for pipeline thread to shutdown gracefully
×
363
        pipeline_thread.join().unwrap()
6✔
364
    }
6✔
365
}
×
366

×
367
pub fn validate_sql(sql: String) -> Result<(), PipelineError> {
×
368
    statement_to_pipeline(&sql, &mut AppPipeline::new(), None).map_or_else(
×
369
        |e| {
×
370
            error!(
×
371
                "[sql][{}] Transforms validation error: {}",
×
372
                get_colored_text("X", "31"),
×
373
                e
×
374
            );
×
375
            Err(e)
×
376
        },
×
377
        |_| {
×
378
            info!(
×
379
                "[sql][{}]  Transforms validation completed",
×
380
                get_colored_text("✓", "32")
×
381
            );
×
382
            Ok(())
×
383
        },
×
384
    )
×
385
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc