• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698406303

pending completion
4698406303

Pull #1426

github

GitHub
Merge daefffe87 into b6889464a
Pull Request #1426: feat: implement python log bindings

1 of 1 new or added line in 1 file covered. (100.0%)

34863 of 45840 relevant lines covered (76.05%)

10764.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.96
/dozer-orchestrator/src/lib.rs
1
pub mod cli;
2✔
2
pub mod errors;
3
pub mod pipeline;
4
pub mod shutdown;
5
pub mod simple;
6

7
use dozer_core::{app::AppPipeline, errors::ExecutionError};
8
use dozer_ingestion::connectors::SourceSchema;
9
use dozer_sql::pipeline::{builder::statement_to_pipeline, errors::PipelineError};
10
use dozer_types::{crossbeam::channel::Sender, log::debug};
11
use errors::OrchestrationError;
12
use shutdown::{ShutdownReceiver, ShutdownSender};
13
use std::{
14
    backtrace::{Backtrace, BacktraceStatus},
15
    collections::HashMap,
16
    panic, process,
17
    thread::current,
18
};
19
use tokio::task::JoinHandle;
20
mod console_helper;
21
pub mod utils;
22

23
pub trait Orchestrator {
24
    fn migrate(&mut self, force: bool) -> Result<(), OrchestrationError>;
25
    fn clean(&mut self) -> Result<(), OrchestrationError>;
26
    fn run_all(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError>;
27
    fn run_api(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError>;
28
    fn run_apps(
29
        &mut self,
30
        shutdown: ShutdownReceiver,
31
        api_notifier: Option<Sender<bool>>,
32
    ) -> Result<(), OrchestrationError>;
33
    #[allow(clippy::type_complexity)]
34
    fn list_connectors(
35
        &self,
36
    ) -> Result<HashMap<String, (Vec<TableInfo>, Vec<SourceSchema>)>, OrchestrationError>;
37
    fn generate_token(&self) -> Result<String, OrchestrationError>;
38
}
39

40
// Re-exports
41
pub use dozer_ingestion::{
42
    connectors::{get_connector, TableInfo},
43
    errors::ConnectorError,
44
};
45
pub use dozer_sql::pipeline::builder::QueryContext;
46

47
pub fn wrapped_statement_to_pipeline(sql: &str) -> Result<QueryContext, PipelineError> {
×
48
    let mut pipeline = AppPipeline::new();
×
49
    statement_to_pipeline(sql, &mut pipeline, None)
×
50
}
×
51

52
pub use dozer_types::models::connection::Connection;
53
use dozer_types::tracing::error;
54

55
async fn flatten_join_handle(
36✔
56
    handle: JoinHandle<Result<(), OrchestrationError>>,
36✔
57
) -> Result<(), OrchestrationError> {
36✔
58
    match handle.await {
36✔
59
        Ok(Ok(_)) => Ok(()),
36✔
60
        Ok(Err(err)) => Err(err),
×
61
        Err(err) => Err(OrchestrationError::InternalError(Box::new(err))),
×
62
    }
63
}
36✔
64

65
pub fn set_panic_hook() {
×
66
    panic::set_hook(Box::new(move |panic_info| {
×
67
        // All the orchestrator errors are captured here
68
        if let Some(e) = panic_info.payload().downcast_ref::<OrchestrationError>() {
×
69
            error!("{}", e);
×
70
            debug!("{:?}", e);
×
71
        // All the connector errors are captured here
72
        } else if let Some(e) = panic_info.payload().downcast_ref::<ConnectorError>() {
×
73
            error!("{}", e);
×
74
            debug!("{:?}", e);
×
75
        // All the pipeline errors are captured here
76
        } else if let Some(e) = panic_info.payload().downcast_ref::<ExecutionError>() {
×
77
            error!("{}", e);
×
78
            debug!("{:?}", e);
×
79
        // If any errors are sent as strings.
80
        } else if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
×
81
            error!("{s:?}");
×
82
        } else {
83
            error!("{}", panic_info);
×
84
        }
85

86
        let backtrace = Backtrace::capture();
×
87
        if backtrace.status() == BacktraceStatus::Captured {
×
88
            error!(
×
89
                "thread '{}' panicked at '{}'\n stack backtrace:\n{}",
×
90
                current()
×
91
                    .name()
×
92
                    .map(ToString::to_string)
×
93
                    .unwrap_or_default(),
×
94
                panic_info
×
95
                    .location()
×
96
                    .map(ToString::to_string)
×
97
                    .unwrap_or_default(),
×
98
                backtrace
×
99
            );
×
100
        }
×
101

102
        process::exit(1);
×
103
    }));
×
104
}
×
105

106
pub fn set_ctrl_handler(shutdown_sender: ShutdownSender) {
×
107
    let mut shutdown = Some(shutdown_sender);
×
108
    ctrlc::set_handler(move || {
×
109
        if let Some(shutdown) = shutdown.take() {
×
110
            shutdown.shutdown()
×
111
        }
×
112
    })
×
113
    .expect("Error setting Ctrl-C handler");
×
114
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc