• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4698406303

pending completion
4698406303

Pull #1426

github

GitHub
Merge daefffe87 into b6889464a
Pull Request #1426: feat: implement python log bindings

1 of 1 new or added line in 1 file covered. (100.0%)

34863 of 45840 relevant lines covered (76.05%)

10764.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.05
/dozer-api/src/grpc/client_server.rs
1
use super::{auth_middleware::AuthMiddlewareLayer, common::CommonService, typed::TypedService};
2
use crate::grpc::auth::AuthService;
3
use crate::grpc::health::HealthService;
4
use crate::grpc::{common, typed};
5
use crate::{errors::GrpcError, generator::protoc::generator::ProtoGenerator, CacheEndpoint};
6
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
7
use dozer_types::grpc_types::types::Operation;
8
use dozer_types::grpc_types::{
9
    auth::auth_grpc_service_server::AuthGrpcServiceServer,
10
    common::common_grpc_service_server::CommonGrpcServiceServer,
11
    health::health_grpc_service_server::HealthGrpcServiceServer,
12
};
13
use dozer_types::tracing::Level;
14
use dozer_types::{
15
    log::info,
16
    models::{api_config::GrpcApiOptions, api_security::ApiSecurity, flags::Flags},
17
};
18
use futures_util::stream::{AbortHandle, Abortable, Aborted};
19
use futures_util::Future;
20
use std::{collections::HashMap, path::PathBuf, sync::Arc};
21
use tokio::sync::broadcast::{self, Receiver};
22
use tonic::transport::Server;
23
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
24
use tower::Layer;
25
use tower_http::trace::{self, TraceLayer};
26

27
pub struct ApiServer {
28
    port: u16,
29
    host: String,
30
    api_dir: PathBuf,
31
    security: Option<ApiSecurity>,
32
    flags: Flags,
33
}
34

35
impl ApiServer {
36
    fn get_dynamic_service(
15✔
37
        &self,
15✔
38
        cache_endpoints: Vec<Arc<CacheEndpoint>>,
15✔
39
        operations_receiver: Option<broadcast::Receiver<Operation>>,
15✔
40
    ) -> Result<
15✔
41
        (
15✔
42
            Option<TypedService>,
15✔
43
            ServerReflectionServer<impl ServerReflection>,
15✔
44
        ),
15✔
45
        GrpcError,
15✔
46
    > {
15✔
47
        info!(
15✔
48
            "Starting gRPC server on http://{}:{} with security: {}",
×
49
            self.host,
×
50
            self.port,
×
51
            self.security
×
52
                .as_ref()
×
53
                .map_or("None".to_string(), |s| match s {
×
54
                    ApiSecurity::Jwt(_) => "JWT".to_string(),
×
55
                })
×
56
        );
57

58
        let descriptor_path = ProtoGenerator::descriptor_path(&self.api_dir);
15✔
59

60
        let descriptor_bytes = ProtoGenerator::read_descriptor_bytes(&descriptor_path)?;
15✔
61

62
        let inflection_service = tonic_reflection::server::Builder::configure()
15✔
63
            .register_encoded_file_descriptor_set(&descriptor_bytes)
15✔
64
            .build()?;
15✔
65

66
        // Service handling dynamic gRPC requests.
67
        let typed_service = if self.flags.dynamic {
15✔
68
            Some(TypedService::new(
15✔
69
                &descriptor_path,
15✔
70
                cache_endpoints,
15✔
71
                operations_receiver,
15✔
72
                self.security.clone(),
15✔
73
            )?)
15✔
74
        } else {
75
            None
×
76
        };
77

78
        Ok((typed_service, inflection_service))
15✔
79
    }
15✔
80

81
    pub fn new(
15✔
82
        grpc_config: GrpcApiOptions,
15✔
83
        api_dir: PathBuf,
15✔
84
        security: Option<ApiSecurity>,
15✔
85
        flags: Flags,
15✔
86
    ) -> Self {
15✔
87
        Self {
15✔
88
            port: grpc_config.port as u16,
15✔
89
            host: grpc_config.host,
15✔
90
            api_dir,
15✔
91
            security,
15✔
92
            flags,
15✔
93
        }
15✔
94
    }
15✔
95

96
    pub async fn run(
12✔
97
        &self,
12✔
98
        cache_endpoints: Vec<Arc<CacheEndpoint>>,
12✔
99
        shutdown: impl Future<Output = ()> + Send + 'static,
12✔
100
        operations_receiver: Option<Receiver<Operation>>,
12✔
101
    ) -> Result<(), GrpcError> {
12✔
102
        // Create our services.
12✔
103
        let mut web_config = tonic_web::config();
12✔
104
        if self.flags.grpc_web {
12✔
105
            web_config = web_config.allow_all_origins();
12✔
106
        }
12✔
107

108
        let common_service = CommonGrpcServiceServer::new(CommonService::new(
12✔
109
            cache_endpoints.clone(),
12✔
110
            operations_receiver.as_ref().map(|r| r.resubscribe()),
12✔
111
        ));
12✔
112
        let common_service = web_config.enable(common_service);
12✔
113

114
        let (typed_service, reflection_service) =
12✔
115
            self.get_dynamic_service(cache_endpoints, operations_receiver)?;
12✔
116
        let typed_service = typed_service.map(|typed_service| web_config.enable(typed_service));
12✔
117
        let reflection_service = web_config.enable(reflection_service);
12✔
118

12✔
119
        let mut service_map: HashMap<String, ServingStatus> = HashMap::new();
12✔
120
        service_map.insert("".to_string(), ServingStatus::Serving);
12✔
121
        service_map.insert(common::SERVICE_NAME.to_string(), ServingStatus::Serving);
12✔
122
        if typed_service.is_some() {
12✔
123
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::Serving);
12✔
124
        } else {
12✔
125
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::NotServing);
×
126
        }
×
127
        let health_service = HealthGrpcServiceServer::new(HealthService {
12✔
128
            serving_status: service_map,
12✔
129
        });
12✔
130
        let health_service = web_config.enable(health_service);
12✔
131

12✔
132
        // Auth middleware.
12✔
133
        let auth_middleware = AuthMiddlewareLayer::new(self.security.clone());
12✔
134

12✔
135
        // Authenticated services.
12✔
136
        let common_service = auth_middleware.layer(common_service);
12✔
137
        let typed_service = typed_service.map(|typed_service| auth_middleware.layer(typed_service));
12✔
138
        let mut authenticated_reflection_service = None;
12✔
139
        let mut unauthenticated_reflection_service = None;
12✔
140
        if self.flags.authenticate_server_reflection {
12✔
141
            authenticated_reflection_service = Some(auth_middleware.layer(reflection_service))
×
142
        } else {
12✔
143
            unauthenticated_reflection_service = Some(reflection_service);
12✔
144
        };
12✔
145
        let health_service = auth_middleware.layer(health_service);
12✔
146

12✔
147
        let mut auth_service = None;
12✔
148
        if self.security.is_some() {
12✔
149
            let service = web_config.enable(AuthGrpcServiceServer::new(AuthService::new(
×
150
                self.security.clone(),
×
151
            )));
×
152
            auth_service = Some(auth_middleware.layer(service));
×
153
        }
12✔
154

155
        // Add services to server.
156
        let mut grpc_router = Server::builder()
12✔
157
            .layer(
12✔
158
                TraceLayer::new_for_http()
12✔
159
                    .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
12✔
160
                    .on_response(trace::DefaultOnResponse::new().level(Level::INFO))
12✔
161
                    .on_failure(trace::DefaultOnFailure::new().level(Level::ERROR)),
12✔
162
            )
12✔
163
            .accept_http1(true)
12✔
164
            .concurrency_limit_per_connection(32)
12✔
165
            .add_service(common_service)
12✔
166
            .add_optional_service(typed_service);
12✔
167

168
        if let Some(reflection_service) = authenticated_reflection_service {
12✔
169
            grpc_router = grpc_router.add_service(reflection_service);
×
170
        }
12✔
171
        if let Some(reflection_service) = unauthenticated_reflection_service {
12✔
172
            grpc_router = grpc_router.add_service(reflection_service);
12✔
173
        }
12✔
174

175
        grpc_router = grpc_router.add_service(health_service);
12✔
176
        grpc_router = grpc_router.add_optional_service(auth_service);
12✔
177

12✔
178
        // Tonic graceful shutdown doesn't allow us to set a timeout, resulting in hanging if a client doesn't close the connection.
12✔
179
        // So we just abort the server when the shutdown signal is received.
12✔
180
        let (abort_handle, abort_registration) = AbortHandle::new_pair();
12✔
181
        tokio::spawn(async move {
12✔
182
            shutdown.await;
12✔
183
            abort_handle.abort();
12✔
184
        });
12✔
185

12✔
186
        // Run server.
12✔
187
        let addr = format!("{:}:{:}", self.host, self.port).parse().unwrap();
12✔
188
        match Abortable::new(grpc_router.serve(addr), abort_registration).await {
24✔
189
            Ok(result) => result.map_err(GrpcError::Transport),
×
190
            Err(Aborted) => Ok(()),
12✔
191
        }
192
    }
12✔
193
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc