• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698406303

pending completion
4698406303

Pull #1426

github

GitHub
Merge daefffe87 into b6889464a
Pull Request #1426: feat: implement python log bindings

1 of 1 new or added line in 1 file covered. (100.0%)

34863 of 45840 relevant lines covered (76.05%)

10764.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-tracing/src/telemetry.rs
1
use dozer_types::log::debug;
2
use dozer_types::models::telemetry::{DozerTelemetryConfig, OpenTelemetryConfig, TelemetryConfig};
3
use opentelemetry::sdk;
4
use opentelemetry::sdk::trace::{BatchConfig, BatchSpanProcessor, Sampler};
5
use opentelemetry::trace::TracerProvider;
6
use opentelemetry::{global, sdk::propagation::TraceContextPropagator};
7
use tracing_appender::non_blocking::WorkerGuard;
8
use tracing_opentelemetry::OpenTelemetryLayer;
9
use tracing_subscriber::layer::SubscriberExt;
10
use tracing_subscriber::util::SubscriberInitExt;
11
use tracing_subscriber::{fmt, EnvFilter, Layer};
12

13
use crate::exporter::DozerExporter;
14
// Init telemetry by setting a global handler
15
pub fn init_telemetry(
×
16
    app_name: Option<&str>,
×
17
    telemetry_config: Option<TelemetryConfig>,
×
18
) -> WorkerGuard {
×
19
    let app_name = app_name.unwrap_or("dozer");
×
20

×
21
    // disable errors from open telemetry
×
22
    opentelemetry::global::set_error_handler(|_| {}).unwrap();
×
23

×
24
    debug!("Initializing telemetry for {:?}", telemetry_config);
×
25

26
    let fmt_layer = fmt::layer().with_target(false);
×
27
    let fmt_filter = EnvFilter::try_from_default_env()
×
28
        .or_else(|_| EnvFilter::try_new("info"))
×
29
        .unwrap();
×
30

×
31
    let log_writer_filter = EnvFilter::try_from_default_env()
×
32
        .or_else(|_| EnvFilter::try_new("info"))
×
33
        .unwrap();
×
34

×
35
    let layers = telemetry_config.map_or((None, None), |c| {
×
36
        let trace_filter = EnvFilter::try_from_env("DOZER_TRACE_FILTER")
×
37
            .or_else(|_| EnvFilter::try_new("dozer=debug"))
×
38
            .unwrap();
×
39
        match c {
×
40
            TelemetryConfig::Dozer(config) => (
×
41
                Some(get_dozer_tracer(config).with_filter(trace_filter)),
×
42
                None,
×
43
            ),
×
44
            TelemetryConfig::OpenTelemetry(config) => (
×
45
                None,
×
46
                Some(get_otel_tracer(app_name, config).with_filter(trace_filter)),
×
47
            ),
×
48
        }
49
    });
×
50

×
51
    let file_appender = tracing_appender::rolling::never("./log", format!("{app_name}.log"));
×
52
    let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
×
53

×
54
    tracing_subscriber::registry()
×
55
        .with(fmt_layer.with_filter(fmt_filter))
×
56
        .with(
×
57
            fmt::Layer::default()
×
58
                .with_writer(non_blocking)
×
59
                .with_filter(log_writer_filter),
×
60
        )
×
61
        .with(layers.0)
×
62
        .with(layers.1)
×
63
        .init();
×
64

×
65
    guard
×
66
}
×
67

68
// Cleanly shutdown telemetry
69
pub fn shutdown_telemetry() {
×
70
    opentelemetry::global::shutdown_tracer_provider();
×
71
}
×
72

73
// Init telemetry with a closure without setting a global subscriber
74
pub fn init_telemetry_closure<T>(
×
75
    app_name: Option<&str>,
×
76
    telemetry_config: Option<TelemetryConfig>,
×
77
    closure: impl FnOnce() -> T,
×
78
) -> T {
×
79
    let app_name = app_name.unwrap_or("dozer");
×
80

×
81
    let fmt_layer = fmt::layer().with_target(false);
×
82
    let fmt_filter = EnvFilter::try_from_default_env()
×
83
        .or_else(|_| EnvFilter::try_new("info"))
×
84
        .unwrap();
×
85

×
86
    let log_writer_filter = EnvFilter::try_from_default_env()
×
87
        .or_else(|_| EnvFilter::try_new("info"))
×
88
        .unwrap();
×
89

×
90
    let layers = telemetry_config.map_or((None, None), |c| {
×
91
        let trace_filter = EnvFilter::try_from_env("DOZER_TRACE_FILTER")
×
92
            .or_else(|_| EnvFilter::try_new("dozer=trace"))
×
93
            .unwrap();
×
94
        match c {
×
95
            TelemetryConfig::Dozer(config) => (
×
96
                Some(get_dozer_tracer(config).with_filter(trace_filter)),
×
97
                None,
×
98
            ),
×
99
            TelemetryConfig::OpenTelemetry(config) => (
×
100
                None,
×
101
                Some(get_otel_tracer(app_name, config).with_filter(trace_filter)),
×
102
            ),
×
103
        }
104
    });
×
105

×
106
    let file_appender = tracing_appender::rolling::never("./log", format!("{app_name}.log"));
×
107
    let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
×
108

×
109
    let subscriber = tracing_subscriber::registry()
×
110
        .with(fmt::Layer::default().with_writer(non_blocking.clone()))
×
111
        .with(fmt_layer.with_filter(fmt_filter))
×
112
        .with(
×
113
            fmt::Layer::default()
×
114
                .with_writer(non_blocking)
×
115
                .with_filter(log_writer_filter),
×
116
        )
×
117
        .with(layers.0)
×
118
        .with(layers.1);
×
119

×
120
    dozer_types::tracing::subscriber::with_default(subscriber, closure)
×
121
}
×
122

123
pub fn get_otel_tracer<S>(
×
124
    app_name: &str,
×
125
    _config: OpenTelemetryConfig,
×
126
) -> OpenTelemetryLayer<S, opentelemetry::sdk::trace::Tracer>
×
127
where
×
128
    S: for<'span> tracing_subscriber::registry::LookupSpan<'span>
×
129
        + dozer_types::tracing::Subscriber,
×
130
{
×
131
    global::set_text_map_propagator(TraceContextPropagator::new());
×
132
    let tracer = opentelemetry_jaeger::new_agent_pipeline()
×
133
        .with_service_name(app_name)
×
134
        .install_batch(opentelemetry::runtime::TokioCurrentThread)
×
135
        .expect("Failed to install OpenTelemetry tracer.");
×
136

×
137
    tracing_opentelemetry::layer().with_tracer(tracer)
×
138
}
×
139

140
pub fn get_dozer_tracer<S>(
×
141
    config: DozerTelemetryConfig,
×
142
) -> OpenTelemetryLayer<S, opentelemetry::sdk::trace::Tracer>
×
143
where
×
144
    S: for<'span> tracing_subscriber::registry::LookupSpan<'span>
×
145
        + dozer_types::tracing::Subscriber,
×
146
{
×
147
    let builder = sdk::trace::TracerProvider::builder();
×
148
    let sample_percent = config.sample_percent as f64 / 100.0;
×
149
    let exporter = DozerExporter::new(config);
×
150
    let batch_config = BatchConfig::default()
×
151
        .with_max_concurrent_exports(100000)
×
152
        .with_max_concurrent_exports(5);
×
153
    let sampler = Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(sample_percent)));
×
154
    let batch_processor =
×
155
        BatchSpanProcessor::builder(exporter, opentelemetry::runtime::TokioCurrentThread)
×
156
            .with_batch_config(batch_config)
×
157
            .build();
×
158

×
159
    let tracer_provider = builder
×
160
        .with_config(opentelemetry::sdk::trace::Config {
×
161
            sampler: Box::new(sampler),
×
162
            ..Default::default()
×
163
        })
×
164
        .with_span_processor(batch_processor)
×
165
        .build();
×
166

×
167
    let tracer = tracer_provider.versioned_tracer(
×
168
        "opentelemetry-dozer",
×
169
        Some(env!("CARGO_PKG_VERSION")),
×
170
        None,
×
171
    );
×
172
    let _ = global::set_tracer_provider(tracer_provider);
×
173
    tracing_opentelemetry::layer().with_tracer(tracer)
×
174
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc