• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4829321272

pending completion
4829321272

Pull #1515

github

GitHub
Merge b6d982211 into f2ab0e6ce
Pull Request #1515: feat: Run migration only when necessary

193 of 193 new or added lines in 11 files covered. (100.0%)

35565 of 45252 relevant lines covered (78.59%)

16462.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.91
/dozer-api/src/grpc/typed/service.rs
1
use super::{
2
    codec::TypedCodec,
3
    helper::{
4
        count_response_to_typed_response, on_event_to_typed_response,
5
        query_response_to_typed_response, token_response,
6
    },
7
    DynamicMessage, TypedResponse,
8
};
9
use crate::{
10
    auth::{Access, Authorizer},
11
    errors::{GenerationError, GrpcError},
12
    generator::protoc::generator::{
13
        CountResponseDesc, EventDesc, ProtoGenerator, QueryResponseDesc, ServiceDesc,
14
        TokenResponseDesc,
15
    },
16
    grpc::shared_impl,
17
    CacheEndpoint,
18
};
19
use dozer_cache::CacheReader;
20
use dozer_types::log::error;
21
use dozer_types::{grpc_types::types::Operation, models::api_security::ApiSecurity};
22
use futures_util::future;
23
use prost_reflect::{MethodDescriptor, Value};
24
use std::{borrow::Cow, collections::HashMap, convert::Infallible};
25
use tokio_stream::wrappers::ReceiverStream;
26
use tonic::{
27
    codegen::{self, *},
28
    metadata::MetadataMap,
29
    Code, Extensions, Request, Response, Status,
30
};
31

32
#[derive(Debug, Clone)]
41✔
33
struct TypedEndpoint {
34
    cache_endpoint: Arc<CacheEndpoint>,
35
    service_desc: ServiceDesc,
36
}
37

38
pub struct TypedService {
39
    accept_compression_encodings: EnabledCompressionEncodings,
40
    send_compression_encodings: EnabledCompressionEncodings,
41
    /// For look up endpoint from its full service name. `key == value.service_desc.service.full_name()`.
42
    endpoint_map: HashMap<String, TypedEndpoint>,
43
    event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
44
    security: Option<ApiSecurity>,
45
}
46

47
impl Clone for TypedService {
48
    fn clone(&self) -> Self {
41✔
49
        Self {
41✔
50
            accept_compression_encodings: self.accept_compression_encodings,
41✔
51
            send_compression_encodings: self.send_compression_encodings,
41✔
52
            endpoint_map: self.endpoint_map.clone(),
41✔
53
            event_notifier: self.event_notifier.as_ref().map(|r| r.resubscribe()),
41✔
54
            security: self.security.to_owned(),
41✔
55
        }
41✔
56
    }
41✔
57
}
58

59
impl TypedService {
60
    pub fn new(
16✔
61
        cache_endpoints: Vec<Arc<CacheEndpoint>>,
16✔
62
        event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
16✔
63
        security: Option<ApiSecurity>,
16✔
64
    ) -> Result<Self, GrpcError> {
16✔
65
        let endpoint_map = cache_endpoints
16✔
66
            .into_iter()
16✔
67
            .map(|cache_endpoint| {
16✔
68
                let service_desc = ProtoGenerator::read_schema(
16✔
69
                    cache_endpoint.descriptor_path(),
16✔
70
                    &cache_endpoint.endpoint.name,
16✔
71
                )?;
16✔
72
                Ok::<_, GenerationError>((
16✔
73
                    service_desc.service.full_name().to_string(),
16✔
74
                    TypedEndpoint {
16✔
75
                        cache_endpoint,
16✔
76
                        service_desc,
16✔
77
                    },
16✔
78
                ))
16✔
79
            })
16✔
80
            .collect::<Result<HashMap<_, _>, _>>()?;
16✔
81
        Ok(Self {
16✔
82
            accept_compression_encodings: EnabledCompressionEncodings::default(),
16✔
83
            send_compression_encodings: EnabledCompressionEncodings::default(),
16✔
84
            endpoint_map,
16✔
85
            event_notifier,
16✔
86
            security,
16✔
87
        })
16✔
88
    }
16✔
89

90
    fn create_grpc(&self, method_desc: MethodDescriptor) -> tonic::server::Grpc<TypedCodec> {
9✔
91
        tonic::server::Grpc::new(TypedCodec::new(method_desc)).apply_compression_config(
9✔
92
            self.accept_compression_encodings,
9✔
93
            self.send_compression_encodings,
9✔
94
        )
9✔
95
    }
9✔
96

97
    fn call_impl<B: Body + Send + 'static>(
8✔
98
        &mut self,
8✔
99
        req: http::Request<B>,
8✔
100
    ) -> Option<BoxFuture<http::Response<tonic::body::BoxBody>, Infallible>>
8✔
101
    where
8✔
102
        B::Error: Into<StdError> + Send + 'static,
8✔
103
    {
8✔
104
        // full name will be in the format of `/dozer.generated.users.Users/query`
8✔
105
        let current_path: Vec<&str> = req.uri().path().split('/').collect();
8✔
106
        if current_path.len() != 3 {
8✔
107
            return None;
×
108
        }
8✔
109

8✔
110
        let full_service_name = current_path[1];
8✔
111
        let typed_endpoint = self.endpoint_map.get(full_service_name)?;
8✔
112

113
        let method_name = current_path[2];
8✔
114
        if method_name == typed_endpoint.service_desc.count.method.name() {
8✔
115
            struct CountService {
116
                cache_endpoint: Arc<CacheEndpoint>,
117
                response_desc: Option<CountResponseDesc>,
118
            }
119
            impl tonic::server::UnaryService<DynamicMessage> for CountService {
120
                type Response = TypedResponse;
121
                type Future = future::Ready<Result<Response<TypedResponse>, Status>>;
122
                fn call(&mut self, request: Request<DynamicMessage>) -> Self::Future {
3✔
123
                    let response = count(
3✔
124
                        request,
3✔
125
                        &self.cache_endpoint.cache_reader(),
3✔
126
                        &self.cache_endpoint.endpoint.name,
3✔
127
                        self.response_desc
3✔
128
                            .take()
3✔
129
                            .expect("This future shouldn't be polled twice"),
3✔
130
                    );
3✔
131
                    future::ready(response)
3✔
132
                }
3✔
133
            }
134

135
            let mut grpc = self.create_grpc(typed_endpoint.service_desc.count.method.clone());
3✔
136
            let method = CountService {
3✔
137
                cache_endpoint: typed_endpoint.cache_endpoint.clone(),
3✔
138
                response_desc: Some(typed_endpoint.service_desc.count.response_desc.clone()),
3✔
139
            };
3✔
140
            Some(Box::pin(async move {
3✔
141
                let res = grpc.unary(method, req).await;
3✔
142
                Ok(res)
3✔
143
            }))
3✔
144
        } else if method_name == typed_endpoint.service_desc.query.method.name() {
5✔
145
            struct QueryService {
146
                cache_endpoint: Arc<CacheEndpoint>,
147
                response_desc: Option<QueryResponseDesc>,
148
            }
149
            impl tonic::server::UnaryService<DynamicMessage> for QueryService {
150
                type Response = TypedResponse;
151
                type Future = future::Ready<Result<Response<TypedResponse>, Status>>;
152
                fn call(&mut self, request: Request<DynamicMessage>) -> Self::Future {
3✔
153
                    let response = query(
3✔
154
                        request,
3✔
155
                        &self.cache_endpoint.cache_reader(),
3✔
156
                        &self.cache_endpoint.endpoint.name,
3✔
157
                        self.response_desc
3✔
158
                            .take()
3✔
159
                            .expect("This future shouldn't be polled twice"),
3✔
160
                    );
3✔
161
                    future::ready(response)
3✔
162
                }
3✔
163
            }
164

165
            let mut grpc = self.create_grpc(typed_endpoint.service_desc.query.method.clone());
3✔
166
            let method = QueryService {
3✔
167
                cache_endpoint: typed_endpoint.cache_endpoint.clone(),
3✔
168
                response_desc: Some(typed_endpoint.service_desc.query.response_desc.clone()),
3✔
169
            };
3✔
170
            Some(Box::pin(async move {
3✔
171
                let res = grpc.unary(method, req).await;
3✔
172
                Ok(res)
3✔
173
            }))
3✔
174
        } else if let Some(on_event_method_desc) = &typed_endpoint.service_desc.on_event {
2✔
175
            if method_name == on_event_method_desc.method.name() {
2✔
176
                struct EventService {
177
                    cache_endpoint: Arc<CacheEndpoint>,
178
                    event_desc: Option<EventDesc>,
179
                    event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
180
                }
181
                impl tonic::server::ServerStreamingService<DynamicMessage> for EventService {
182
                    type Response = TypedResponse;
183

184
                    type ResponseStream = ReceiverStream<Result<TypedResponse, tonic::Status>>;
185

186
                    type Future =
187
                        future::Ready<Result<tonic::Response<Self::ResponseStream>, tonic::Status>>;
188
                    fn call(&mut self, request: tonic::Request<DynamicMessage>) -> Self::Future {
3✔
189
                        future::ready(on_event(
3✔
190
                            request,
3✔
191
                            &self.cache_endpoint.cache_reader(),
3✔
192
                            &self.cache_endpoint.endpoint.name,
3✔
193
                            self.event_desc
3✔
194
                                .take()
3✔
195
                                .expect("This future shouldn't be polled twice"),
3✔
196
                            self.event_notifier.take(),
3✔
197
                        ))
3✔
198
                    }
3✔
199
                }
200

201
                let mut grpc = self.create_grpc(on_event_method_desc.method.clone());
3✔
202
                let method = EventService {
3✔
203
                    cache_endpoint: typed_endpoint.cache_endpoint.clone(),
3✔
204
                    event_desc: Some(on_event_method_desc.response_desc.clone()),
3✔
205
                    event_notifier: self.event_notifier.as_ref().map(|r| r.resubscribe()),
3✔
206
                };
3✔
207
                Some(Box::pin(async move {
3✔
208
                    let res = grpc.server_streaming(method, req).await;
3✔
209
                    Ok(res)
3✔
210
                }))
3✔
211
            } else {
212
                None
×
213
            }
214
        } else if let Some(token_method_desc) = &typed_endpoint.service_desc.token {
×
215
            if method_name == token_method_desc.method.name() {
×
216
                struct AuthService {
217
                    response_desc: Option<TokenResponseDesc>,
218
                    security: Option<ApiSecurity>,
219
                }
220
                impl tonic::server::UnaryService<DynamicMessage> for AuthService {
221
                    type Response = TypedResponse;
222
                    type Future = future::Ready<Result<Response<TypedResponse>, Status>>;
223
                    fn call(&mut self, request: Request<DynamicMessage>) -> Self::Future {
×
224
                        let response = token(
×
225
                            request,
×
226
                            self.security.take(),
×
227
                            self.response_desc
×
228
                                .take()
×
229
                                .expect("This future shouldn't be polled twice"),
×
230
                        );
×
231
                        future::ready(response)
×
232
                    }
×
233
                }
234

235
                let mut grpc = self.create_grpc(token_method_desc.method.clone());
×
236
                let method = AuthService {
×
237
                    response_desc: Some(token_method_desc.response_desc.clone()),
×
238
                    security: self.security.clone(),
×
239
                };
×
240
                Some(Box::pin(async move {
×
241
                    let res = grpc.unary(method, req).await;
×
242
                    Ok(res)
×
243
                }))
×
244
            } else {
245
                None
×
246
            }
247
        } else {
248
            None
×
249
        }
250
    }
9✔
251
}
252
impl<B> codegen::Service<http::Request<B>> for TypedService
253
where
254
    B: Body + Send + 'static,
255
    B::Error: Into<StdError> + Send + 'static,
256
{
257
    type Response = http::Response<tonic::body::BoxBody>;
258
    type Error = std::convert::Infallible;
259
    type Future = BoxFuture<Self::Response, Self::Error>;
260
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
9✔
261
        Poll::Ready(Ok(()))
9✔
262
    }
9✔
263
    fn call(&mut self, req: http::Request<B>) -> Self::Future {
9✔
264
        match self.call_impl(req) {
9✔
265
            Some(fut) => fut,
9✔
266
            None => Box::pin(async move {
×
267
                Ok(http::Response::builder()
×
268
                    .status(200)
×
269
                    .header("grpc-status", "12")
×
270
                    .header("content-type", "application/grpc")
×
271
                    .body(empty_body())
×
272
                    .unwrap())
×
273
            }),
×
274
        }
275
    }
9✔
276
}
277

278
impl tonic::server::NamedService for TypedService {
279
    const NAME: &'static str = ":dozer.generated";
280
}
281

282
fn parse_request(
6✔
283
    (_, extensions, query_request): &mut (MetadataMap, Extensions, DynamicMessage),
6✔
284
) -> Result<(Option<Cow<str>>, Option<Access>), Status> {
6✔
285
    let access = extensions.remove::<Access>();
6✔
286

6✔
287
    let query = query_request.get_field_by_name("query");
6✔
288
    let query = query
6✔
289
        .map(|query| match query {
6✔
290
            Cow::Owned(query) => {
2✔
291
                if let Value::String(query) = query {
2✔
292
                    Ok(Cow::Owned(query))
2✔
293
                } else {
294
                    Err(Status::new(Code::InvalidArgument, "query must be a string"))
×
295
                }
296
            }
297
            Cow::Borrowed(query) => query
4✔
298
                .as_str()
4✔
299
                .map(Cow::Borrowed)
4✔
300
                .ok_or_else(|| Status::new(Code::InvalidArgument, "query must be a string")),
4✔
301
        })
6✔
302
        .transpose()?;
6✔
303
    Ok((query, access))
6✔
304
}
6✔
305

306
fn count(
3✔
307
    request: Request<DynamicMessage>,
3✔
308
    reader: &CacheReader,
3✔
309
    endpoint: &str,
3✔
310
    response_desc: CountResponseDesc,
3✔
311
) -> Result<Response<TypedResponse>, Status> {
3✔
312
    let mut parts = request.into_parts();
3✔
313
    let (query, access) = parse_request(&mut parts)?;
3✔
314

315
    let count = shared_impl::count(reader, query.as_deref(), endpoint, access)?;
3✔
316
    let res = count_response_to_typed_response(count, response_desc).map_err(|e| {
3✔
317
        error!("Count API error: {:?}", e);
×
318
        Status::internal("Count API error")
×
319
    })?;
3✔
320
    Ok(Response::new(res))
3✔
321
}
3✔
322

323
fn query(
3✔
324
    request: Request<DynamicMessage>,
3✔
325
    reader: &CacheReader,
3✔
326
    endpoint: &str,
3✔
327
    response_desc: QueryResponseDesc,
3✔
328
) -> Result<Response<TypedResponse>, Status> {
3✔
329
    let mut parts = request.into_parts();
3✔
330
    let (query, access) = parse_request(&mut parts)?;
3✔
331

332
    let records = shared_impl::query(reader, query.as_deref(), endpoint, access)?;
3✔
333
    let res = query_response_to_typed_response(records, response_desc).map_err(|e| {
3✔
334
        error!("Query API error: {:?}", e);
×
335
        Status::internal("Query API error")
×
336
    })?;
3✔
337
    Ok(Response::new(res))
3✔
338
}
3✔
339

340
fn on_event(
3✔
341
    request: Request<DynamicMessage>,
3✔
342
    reader: &CacheReader,
3✔
343
    endpoint_name: &str,
3✔
344
    event_desc: EventDesc,
3✔
345
    event_notifier: Option<tokio::sync::broadcast::Receiver<Operation>>,
3✔
346
) -> Result<Response<ReceiverStream<Result<TypedResponse, tonic::Status>>>, Status> {
3✔
347
    let parts = request.into_parts();
3✔
348
    let extensions = parts.1;
3✔
349
    let query_request = parts.2;
3✔
350
    let access = extensions.get::<Access>();
3✔
351
    let filter = query_request.get_field_by_name("filter");
3✔
352
    let filter = filter
3✔
353
        .as_ref()
3✔
354
        .map(|filter| {
3✔
355
            filter
3✔
356
                .as_str()
3✔
357
                .ok_or_else(|| Status::new(Code::InvalidArgument, "filter must be a string"))
3✔
358
        })
3✔
359
        .transpose()?;
3✔
360

361
    let endpoint_to_be_streamed = endpoint_name.to_string();
3✔
362
    shared_impl::on_event(reader, filter, event_notifier, access.cloned(), move |op| {
3✔
363
        if endpoint_to_be_streamed == op.endpoint_name {
×
364
            match on_event_to_typed_response(op, event_desc.clone()) {
×
365
                Ok(event) => Some(Ok(event)),
×
366
                Err(e) => {
×
367
                    error!("On event error: {:?}", e);
×
368
                    None
×
369
                }
370
            }
371
        } else {
372
            None
×
373
        }
374
    })
3✔
375
}
3✔
376

377
fn token(
378
    request: Request<DynamicMessage>,
379
    security: Option<ApiSecurity>,
380
    response_desc: TokenResponseDesc,
381
) -> Result<Response<TypedResponse>, Status> {
382
    if let Some(security) = security {
×
383
        let _parts = request.into_parts();
×
384

×
385
        let auth = Authorizer::from(&security);
×
386
        let token = auth.generate_token(Access::All, None).unwrap();
×
387
        let res = token_response(token, response_desc);
×
388
        Ok(Response::new(res))
×
389
    } else {
390
        Err(Status::unavailable("security config unavailable"))
×
391
    }
392
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc