• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/simple/orchestrator.rs
1
use super::executor::Executor;
2
use crate::console_helper::get_colored_text;
3
use crate::errors::OrchestrationError;
4
use crate::pipeline::CacheSinkSettings;
5
use crate::utils::{
6
    get_api_dir, get_api_security_config, get_cache_dir, get_flags, get_grpc_config,
7
    get_pipeline_config, get_pipeline_dir, get_rest_config,
8
};
9
use crate::{flatten_joinhandle, Orchestrator};
10
use dozer_api::auth::{Access, Authorizer};
11
use dozer_api::generator::protoc::generator::ProtoGenerator;
12
use dozer_api::{
13
    actix_web::dev::ServerHandle,
14
    grpc::{
15
        self, internal::internal_pipeline_server::start_internal_pipeline_server,
16
        internal_grpc::PipelineResponse,
17
    },
18
    rest, CacheEndpoint,
19
};
20
use dozer_cache::cache::{CacheCommonOptions, CacheOptions, CacheReadOptions, CacheWriteOptions};
21
use dozer_cache::cache::{CacheOptionsKind, LmdbCache};
22
use dozer_core::dag::dag_schemas::{prepare_dag, DagSchemas};
23
use dozer_core::dag::errors::ExecutionError::InternalError;
24
use dozer_ingestion::ingestion::IngestionConfig;
25
use dozer_ingestion::ingestion::Ingestor;
26
use dozer_sql::pipeline::builder::statement_to_pipeline;
27
use dozer_types::crossbeam::channel::{self, unbounded, Sender};
28
use dozer_types::log::{info, warn};
29
use dozer_types::models::api_config::ApiConfig;
30
use dozer_types::models::api_endpoint::ApiEndpoint;
31
use dozer_types::models::app_config::Config;
32
use dozer_types::prettytable::{row, Table};
33
use dozer_types::serde_yaml;
34
use dozer_types::tracing::error;
35
use dozer_types::types::{Operation, Schema, SchemaWithChangesType};
36
use futures::stream::FuturesUnordered;
37
use futures::StreamExt;
38
use std::collections::HashMap;
39
use std::fs;
40
use std::path::{Path, PathBuf};
41
use std::sync::atomic::{AtomicBool, Ordering};
42
use std::{sync::Arc, thread};
43
use tokio::sync::{broadcast, oneshot};
44

45
#[derive(Default, Clone)]
×
46
pub struct SimpleOrchestrator {
47
    pub config: Config,
48
    pub cache_common_options: CacheCommonOptions,
49
    pub cache_read_options: CacheReadOptions,
50
    pub cache_write_options: CacheWriteOptions,
51
}
52

53
impl SimpleOrchestrator {
54
    pub fn new(config: &Config) -> Self {
×
55
        Self {
×
56
            config: config.clone(),
×
57
            ..Default::default()
×
58
        }
×
59
    }
×
60
    fn write_internal_config(&self) -> Result<(), OrchestrationError> {
×
61
        let path = Path::new(&self.config.home_dir).join("internal_config");
×
62
        if path.exists() {
×
63
            fs::remove_dir_all(&path).unwrap();
×
64
        }
×
65
        fs::create_dir_all(&path).unwrap();
×
66
        let yaml_path = path.join("config.yaml");
×
67
        let f = std::fs::OpenOptions::new()
×
68
            .create(true)
×
69
            .write(true)
×
70
            .open(yaml_path)
×
71
            .expect("Couldn't open file");
×
72
        let api_config = self.config.api.to_owned().unwrap_or_default();
×
73
        let api_internal = api_config.to_owned().api_internal.unwrap_or_default();
×
74
        let pipeline_internal = api_config.pipeline_internal.unwrap_or_default();
×
75
        let mut internal_content = serde_yaml::Value::default();
×
76
        internal_content["api_internal"] = serde_yaml::to_value(api_internal)
×
77
            .map_err(OrchestrationError::FailedToWriteConfigYaml)?;
×
78
        internal_content["pipeline_internal"] = serde_yaml::to_value(pipeline_internal)
×
79
            .map_err(OrchestrationError::FailedToWriteConfigYaml)?;
×
80
        serde_yaml::to_writer(f, &internal_content)
×
81
            .map_err(OrchestrationError::FailedToWriteConfigYaml)?;
×
82
        Ok(())
×
83
    }
×
84
}
85

86
impl Orchestrator for SimpleOrchestrator {
87
    fn run_api(&mut self, running: Arc<AtomicBool>) -> Result<(), OrchestrationError> {
×
88
        // Channel to communicate CtrlC with API Server
×
89
        let (tx, rx) = unbounded::<ServerHandle>();
×
90
        // gRPC notifier channel
×
91
        let cache_dir = get_cache_dir(self.config.to_owned());
×
92

×
93
        // Flags
×
94
        let flags = self.config.flags.clone().unwrap_or_default();
×
95

×
96
        let mut cache_endpoints = vec![];
×
97
        for ce in &self.config.endpoints {
×
98
            let mut cache_common_options = self.cache_common_options.clone();
×
99
            cache_common_options.set_path(cache_dir.join(ce.name.clone()));
×
100
            cache_endpoints.push(CacheEndpoint {
×
101
                cache: Arc::new(
×
102
                    LmdbCache::new(CacheOptions {
×
103
                        common: cache_common_options,
×
104
                        kind: CacheOptionsKind::ReadOnly(self.cache_read_options.clone()),
×
105
                    })
×
106
                    .map_err(OrchestrationError::CacheInitFailed)?,
×
107
                ),
108
                endpoint: ce.to_owned(),
×
109
            });
110
        }
111

112
        let ce2 = cache_endpoints.clone();
×
113

×
114
        let rt = tokio::runtime::Runtime::new().expect("Failed to initialize tokio runtime");
×
115
        let (sender_shutdown, receiver_shutdown) = oneshot::channel::<()>();
×
116
        rt.block_on(async {
×
117
            let mut futures = FuturesUnordered::new();
×
118

×
119
            // Initialize API Server
×
120
            let rest_config = get_rest_config(self.config.to_owned());
×
121
            let security = get_api_security_config(self.config.to_owned());
×
122
            let rest_handle = tokio::spawn(async move {
×
123
                let api_server = rest::ApiServer::new(rest_config, security);
×
124
                api_server
×
125
                    .run(cache_endpoints, tx)
×
126
                    .await
×
127
                    .map_err(OrchestrationError::ApiServerFailed)
×
128
            });
×
129
            // Initiate Push Events
×
130
            // create broadcast channel
×
131
            let pipeline_config = get_pipeline_config(self.config.to_owned());
×
132

133
            let rx1 = if flags.push_events {
×
134
                let (tx, rx1) = broadcast::channel::<PipelineResponse>(16);
×
135

×
136
                let handle = tokio::spawn(async move {
×
137
                    grpc::ApiServer::setup_broad_cast_channel(tx, pipeline_config)
×
138
                        .await
×
139
                        .map_err(OrchestrationError::GrpcServerFailed)
×
140
                });
×
141

×
142
                futures.push(flatten_joinhandle(handle));
×
143

×
144
                Some(rx1)
×
145
            } else {
146
                None
×
147
            };
148

149
            // Initialize GRPC Server
150

151
            let api_dir = get_api_dir(self.config.to_owned());
×
152
            let grpc_config = get_grpc_config(self.config.to_owned());
×
153

×
154
            let api_security = get_api_security_config(self.config.to_owned());
×
155
            let grpc_server = grpc::ApiServer::new(grpc_config, api_dir, api_security, flags);
×
156
            let grpc_handle = tokio::spawn(async move {
×
157
                grpc_server
×
158
                    .run(ce2, receiver_shutdown, rx1)
×
159
                    .await
×
160
                    .map_err(OrchestrationError::GrpcServerFailed)
×
161
            });
×
162

×
163
            futures.push(flatten_joinhandle(rest_handle));
×
164
            futures.push(flatten_joinhandle(grpc_handle));
×
165

166
            while let Some(result) = futures.next().await {
×
167
                result?;
×
168
            }
169
            Ok::<(), OrchestrationError>(())
×
170
        })?;
×
171

172
        let server_handle = rx
×
173
            .recv()
×
174
            .map_err(OrchestrationError::GrpcServerHandleError)?;
×
175

176
        // Waiting for Ctrl+C
177
        while running.load(Ordering::SeqCst) {}
×
178
        sender_shutdown.send(()).unwrap();
×
179
        rest::ApiServer::stop(server_handle);
×
180

×
181
        Ok(())
×
182
    }
×
183

184
    fn run_apps(
×
185
        &mut self,
×
186
        running: Arc<AtomicBool>,
×
187
        api_notifier: Option<Sender<bool>>,
×
188
    ) -> Result<(), OrchestrationError> {
×
189
        let pipeline_home_dir = get_pipeline_dir(self.config.to_owned());
×
190
        // gRPC notifier channel
×
191
        let (sender, receiver) = channel::unbounded::<PipelineResponse>();
×
192
        let internal_app_config = self.config.to_owned();
×
193
        let _intern_pipeline_thread = thread::spawn(move || {
×
194
            if let Err(e) = start_internal_pipeline_server(internal_app_config, receiver) {
×
195
                std::panic::panic_any(OrchestrationError::InternalServerFailed(e));
×
196
            }
×
197
            warn!("Shutting down internal pipeline server");
×
198
        });
×
199
        // Ingestion channel
×
200
        let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
×
201
        let cache_dir = get_cache_dir(self.config.to_owned());
×
202

203
        let cache_endpoints: Vec<CacheEndpoint> = self.get_cache_endpoints(cache_dir)?;
×
204

205
        if let Some(api_notifier) = api_notifier {
×
206
            api_notifier
×
207
                .send(true)
×
208
                .expect("Failed to notify API server");
×
209
        }
×
210

211
        let sources = self.config.sources.clone();
×
212

×
213
        let executor = Executor::new(
×
214
            sources,
×
215
            cache_endpoints,
×
216
            ingestor,
×
217
            iterator,
×
218
            running,
×
219
            pipeline_home_dir,
×
220
        );
×
221
        let flags = get_flags(self.config.clone());
×
222
        let api_security = get_api_security_config(self.config.clone());
×
223
        let settings = CacheSinkSettings::new(flags, api_security);
×
224
        executor.run(Some(sender), settings)
×
225
    }
×
226

227
    fn list_connectors(
×
228
        &self,
×
229
    ) -> Result<HashMap<String, Vec<SchemaWithChangesType>>, OrchestrationError> {
×
230
        Executor::get_tables(&self.config.connections)
×
231
    }
×
232

233
    fn generate_token(&self) -> Result<String, OrchestrationError> {
234
        if let Some(api_config) = self.config.api.to_owned() {
×
235
            if let Some(api_security) = api_config.api_security {
×
236
                match api_security {
×
237
                    dozer_types::models::api_security::ApiSecurity::Jwt(secret) => {
×
238
                        let auth = Authorizer::new(&secret, None, None);
×
239
                        let token = auth.generate_token(Access::All, None).map_err(|err| {
×
240
                            OrchestrationError::GenerateTokenFailed(err.to_string())
×
241
                        })?;
×
242
                        return Ok(token);
×
243
                    }
244
                }
245
            }
×
246
        }
×
247
        Err(OrchestrationError::GenerateTokenFailed(
×
248
            "Missing api config or security input".to_owned(),
×
249
        ))
×
250
    }
×
251

252
    fn query(
×
253
        &self,
×
254
        sql: String,
×
255
        sender: Sender<Operation>,
×
256
        running: Arc<AtomicBool>,
×
257
    ) -> Result<Schema, OrchestrationError> {
×
258
        // Ingestion channel
×
259
        let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
×
260

×
261
        let sources = self.config.sources.clone();
×
262

263
        let pipeline_dir = tempdir::TempDir::new("query4")
×
264
            .map_err(|e| OrchestrationError::InternalError(Box::new(e)))?;
×
265
        let executor = Executor::new(
×
266
            sources,
×
267
            vec![],
×
268
            ingestor,
×
269
            iterator,
×
270
            running,
×
271
            pipeline_dir.into_path(),
×
272
        );
×
273

274
        let dag = executor.query(sql, sender)?;
×
275
        let dag_schemas = DagSchemas::new(&dag)?;
×
276
        let streaming_sink_handle = dag.sinks().next().expect("Sink is expected").0;
×
277
        let (schema, _ctx) = dag_schemas
×
278
            .get_node_input_schemas(streaming_sink_handle)?
×
279
            .values()
×
280
            .next()
×
281
            .expect("schema is expected")
×
282
            .clone();
×
283
        Ok(schema)
×
284
    }
×
285
    fn migrate(&mut self, force: bool) -> Result<(), OrchestrationError> {
286
        self.write_internal_config()
×
287
            .map_err(|e| InternalError(Box::new(e)))?;
×
288
        let pipeline_home_dir = get_pipeline_dir(self.config.to_owned());
×
289
        let api_dir = get_api_dir(self.config.to_owned());
×
290
        let cache_dir = get_cache_dir(self.config.to_owned());
×
291

×
292
        info!(
×
293
            "Initiating app: {}",
×
294
            get_colored_text(&self.config.app_name, "35")
×
295
        );
296
        if api_dir.exists() || pipeline_home_dir.exists() || cache_dir.exists() {
×
297
            if force {
×
298
                self.clean()?;
×
299
            } else {
300
                return Err(OrchestrationError::InitializationFailed(
×
301
                    self.config.home_dir.to_string(),
×
302
                ));
×
303
            }
304
        }
×
305

306
        info!(
×
307
            "Home dir: {}",
×
308
            get_colored_text(&self.config.home_dir, "35")
×
309
        );
310
        if let Some(api_config) = &self.config.api {
×
311
            print_api_config(api_config)
×
312
        }
×
313

314
        print_api_endpoints(&self.config.endpoints);
×
315
        validate_endpoints(&self.config.endpoints)?;
×
316

317
        // Ingestion channel
318
        let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
×
319

320
        let cache_endpoints: Vec<CacheEndpoint> = self.get_cache_endpoints(cache_dir)?;
×
321

322
        let sources = self.config.sources.clone();
×
323

×
324
        let executor = Executor::new(
×
325
            sources,
×
326
            cache_endpoints,
×
327
            ingestor,
×
328
            iterator,
×
329
            Arc::new(AtomicBool::new(true)),
×
330
            pipeline_home_dir.clone(),
×
331
        );
×
332

×
333
        // Api Path
×
334
        let generated_path = api_dir.join("generated");
×
335
        if !generated_path.exists() {
×
336
            fs::create_dir_all(generated_path.clone()).map_err(|e| InternalError(Box::new(e)))?;
×
337
        }
×
338

339
        // Pipeline path
340
        fs::create_dir_all(pipeline_home_dir.clone()).map_err(|e| {
×
341
            OrchestrationError::PipelineDirectoryInitFailed(
×
342
                pipeline_home_dir.to_string_lossy().to_string(),
×
343
                e,
×
344
            )
×
345
        })?;
×
346
        let api_security = get_api_security_config(self.config.clone());
×
347
        let flags = get_flags(self.config.clone());
×
348
        let settings = CacheSinkSettings::new(flags, api_security);
×
349
        let dag = executor.build_pipeline(None, generated_path.clone(), settings)?;
×
350
        let dag_schemas = DagSchemas::new(&dag)?;
×
351
        // Every sink will initialize its schema in sink and also in a proto file.
352
        prepare_dag(&dag, &dag_schemas)?;
×
353

354
        let mut resources = Vec::new();
×
355
        for e in &self.config.endpoints {
×
356
            resources.push(e.name.clone());
×
357
        }
×
358

359
        // Copy common service to be included in descriptor.
360
        resources.push("common".to_string());
×
361

×
362
        ProtoGenerator::copy_common(&generated_path)
×
363
            .map_err(|e| OrchestrationError::InternalError(Box::new(e)))?;
×
364
        // Generate a descriptor based on all proto files generated within sink.
365
        ProtoGenerator::generate_descriptor(&generated_path, resources)
×
366
            .map_err(|e| OrchestrationError::InternalError(Box::new(e)))?;
×
367

368
        Ok(())
×
369
    }
×
370

371
    // Cleaning the entire folder as there will be inconsistencies
372
    // between pipeline, cache and generated proto files.
373
    fn clean(&mut self) -> Result<(), OrchestrationError> {
×
374
        let home_dir = PathBuf::from(self.config.home_dir.clone());
×
375
        if home_dir.exists() {
×
376
            fs::remove_dir_all(&home_dir).map_err(|e| InternalError(Box::new(e)))?;
×
377
        };
×
378
        Ok(())
×
379
    }
×
380
}
381

382
impl SimpleOrchestrator {
383
    fn get_cache_endpoints(
×
384
        &self,
×
385
        cache_dir: PathBuf,
×
386
    ) -> Result<Vec<CacheEndpoint>, OrchestrationError> {
×
387
        let mut cache_endpoints = Vec::new();
×
388
        for e in &self.config.endpoints {
×
389
            let mut cache_common_options = self.cache_common_options.clone();
×
390
            cache_common_options.set_path(cache_dir.join(e.name.clone()));
×
391
            cache_endpoints.push(CacheEndpoint {
×
392
                cache: Arc::new(
×
393
                    LmdbCache::new(CacheOptions {
×
394
                        common: cache_common_options,
×
395
                        kind: CacheOptionsKind::Write(self.cache_write_options.clone()),
×
396
                    })
×
397
                    .map_err(|e| OrchestrationError::InternalError(Box::new(e)))?,
×
398
                ),
399
                endpoint: e.to_owned(),
×
400
            })
401
        }
402
        Ok(cache_endpoints)
×
403
    }
×
404
}
405

406
pub fn validate_endpoints(endpoints: &Vec<ApiEndpoint>) -> Result<(), OrchestrationError> {
×
407
    let mut is_all_valid = true;
×
408
    for endpoint in endpoints {
×
409
        endpoint.sql.as_ref().map_or_else(
×
410
            || {
×
411
                info!(
×
412
                    "[Endpoints][{}] {} Endpoint validation completed",
×
413
                    endpoint.name,
×
414
                    get_colored_text("✓", "32")
×
415
                );
416
            },
×
417
            |sql| {
×
418
                statement_to_pipeline(sql).map_or_else(
×
419
                    |e| {
×
420
                        is_all_valid = false;
×
421
                        error!(
×
422
                            "[Endpoints][{}] {} Endpoint validation error: {}",
×
423
                            endpoint.name,
×
424
                            get_colored_text("X", "31"),
×
425
                            e
×
426
                        );
×
427
                    },
×
428
                    |_| {
×
429
                        info!(
×
430
                            "[Endpoints][{}] {} Endpoint validation completed",
×
431
                            endpoint.name,
×
432
                            get_colored_text("✓", "32")
×
433
                        );
434
                    },
×
435
                );
×
436
            },
×
437
        );
×
438
    }
×
439

440
    if is_all_valid {
×
441
        Ok(())
×
442
    } else {
443
        Err(OrchestrationError::PipelineValidationError)
×
444
    }
445
}
×
446

447
fn print_api_config(api_config: &ApiConfig) {
×
448
    info!("[API] {}", get_colored_text("Configuration", "35"));
×
449
    let mut table_parent = Table::new();
×
450

×
451
    table_parent.add_row(row!["Type", "IP", "Port"]);
×
452
    if let Some(rest_config) = &api_config.rest {
×
453
        table_parent.add_row(row!["REST", rest_config.host, rest_config.port]);
×
454
    }
×
455

456
    if let Some(grpc_config) = &api_config.grpc {
×
457
        table_parent.add_row(row!["GRPC", grpc_config.host, grpc_config.port]);
×
458
    }
×
459

460
    table_parent.printstd();
×
461
}
×
462

463
fn print_api_endpoints(endpoints: &Vec<ApiEndpoint>) {
×
464
    info!("[API] {}", get_colored_text("Endpoints", "35"));
×
465
    let mut table_parent = Table::new();
×
466

×
467
    table_parent.add_row(row!["Path", "Name", "Sql"]);
×
468
    for endpoint in endpoints {
×
469
        let sql = endpoint
×
470
            .sql
×
471
            .as_ref()
×
472
            .map_or("-".to_string(), |sql| sql.clone());
×
473
        table_parent.add_row(row![endpoint.path, endpoint.name, sql]);
×
474
    }
×
475

476
    table_parent.printstd();
×
477
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc