• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4195006582

pending completion
4195006582

Pull #918

github

GitHub
Merge 14ceac9a6 into dc166625f
Pull Request #918: ObjectStore - Reduce code duplication & more clone prevention

177 of 177 new or added lines in 5 files covered. (100.0%)

24368 of 37519 relevant lines covered (64.95%)

43913.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.75
/dozer-api/src/grpc/typed/tests/service.rs
1
use crate::{
2
    auth::{Access, Authorizer},
3
    grpc::{
4
        auth_middleware::AuthMiddlewareLayer,
5
        client_server::ApiServer,
6
        internal_grpc::PipelineResponse,
7
        typed::{
8
            tests::{
9
                fake_internal_pipeline_server::start_fake_internal_grpc_pipeline,
10
                generated::films::{
11
                    films_client::FilmsClient, CountFilmsResponse, FilmEvent, QueryFilmsRequest,
12
                    QueryFilmsResponse,
13
                },
14
            },
15
            TypedService,
16
        },
17
    },
18
    RoCacheEndpoint,
19
};
20
use dozer_cache::{
21
    cache::expression::{FilterExpression, QueryExpression},
22
    CacheReader,
23
};
24
use dozer_types::models::{api_config::default_api_config, api_security::ApiSecurity};
25
use futures_util::FutureExt;
26
use std::{env, path::PathBuf, str::FromStr, time::Duration};
27

28
use super::{generated::films::FilmEventRequest, types::EventType};
29
use crate::test_utils;
30
use tokio::{
31
    sync::{
32
        broadcast::{self, Receiver},
33
        oneshot,
34
    },
35
    time::timeout,
36
};
37
use tokio_stream::StreamExt;
38
use tonic::{
39
    metadata::MetadataValue,
40
    transport::{Endpoint, Server},
41
    Code, Request,
42
};
43

44
pub fn setup_pipeline() -> (Vec<RoCacheEndpoint>, Receiver<PipelineResponse>) {
11✔
45
    let endpoint = test_utils::get_endpoint();
11✔
46
    let cache_endpoint = RoCacheEndpoint {
11✔
47
        cache_reader: CacheReader::new(test_utils::initialize_cache(&endpoint.name, None)),
11✔
48
        endpoint,
11✔
49
    };
11✔
50

11✔
51
    let (tx, rx1) = broadcast::channel::<PipelineResponse>(16);
11✔
52
    let default_api_internal = default_api_config().pipeline_internal.unwrap_or_default();
11✔
53
    tokio::spawn(async {
11✔
54
        ApiServer::setup_broad_cast_channel(tx, default_api_internal)
8✔
55
            .await
65✔
56
            .unwrap();
3✔
57
    });
11✔
58

11✔
59
    (vec![cache_endpoint], rx1)
11✔
60
}
11✔
61

×
62
fn setup_typed_service(security: Option<ApiSecurity>) -> TypedService {
7✔
63
    let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
7✔
64

7✔
65
    let path = out_dir.join("generated_films.bin");
7✔
66

7✔
67
    let (endpoints, rx1) = setup_pipeline();
7✔
68

7✔
69
    TypedService::new(&path, endpoints, Some(rx1), security).unwrap()
7✔
70
}
7✔
71

×
72
async fn test_grpc_count_and_query_common(
4✔
73
    port: u32,
4✔
74
    request: QueryFilmsRequest,
4✔
75
    api_security: Option<ApiSecurity>,
4✔
76
    access_token: Option<String>,
4✔
77
) -> Result<(CountFilmsResponse, QueryFilmsResponse), tonic::Status> {
4✔
78
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
4✔
79
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
4✔
80
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
4✔
81
        default_pipeline_internal.host,
4✔
82
        default_pipeline_internal.port,
4✔
83
        rx_internal,
4✔
84
    ));
4✔
85

4✔
86
    let typed_service = setup_typed_service(api_security.to_owned());
4✔
87
    let (_tx, rx) = oneshot::channel::<()>();
4✔
88
    // middleware
4✔
89
    let layer = tower::ServiceBuilder::new()
4✔
90
        .layer(AuthMiddlewareLayer::new(api_security.to_owned()))
4✔
91
        .into_inner();
4✔
92
    let _jh = tokio::spawn(async move {
4✔
93
        Server::builder()
4✔
94
            .layer(layer)
4✔
95
            .add_service(typed_service)
4✔
96
            .serve_with_shutdown(format!("127.0.0.1:{port:}").parse().unwrap(), rx.map(drop))
4✔
97
            .await
4✔
98
            .unwrap();
×
99
    });
4✔
100
    tokio::time::sleep(Duration::from_millis(100)).await;
4✔
101
    let channel = Endpoint::from_str(&format!("http://127.0.0.1:{port:}"))
4✔
102
        .unwrap()
4✔
103
        .connect()
4✔
104
        .await
8✔
105
        .unwrap();
4✔
106
    if api_security.is_some() {
4✔
107
        let my_token = access_token.unwrap_or_default();
2✔
108
        let mut client = FilmsClient::with_interceptor(channel, move |mut req: Request<()>| {
3✔
109
            let token: MetadataValue<_> = format!("Bearer {my_token:}").parse().unwrap();
3✔
110
            req.metadata_mut().insert("authorization", token);
3✔
111
            Ok(req)
3✔
112
        });
3✔
113
        let res = client.count(Request::new(request.clone())).await?;
4✔
114
        let count_response = res.into_inner();
1✔
115
        let res = client.query(Request::new(request)).await?;
2✔
116
        let query_response = res.into_inner();
1✔
117
        _ = sender_shutdown_internal.send(());
1✔
118
        Ok((count_response, query_response))
1✔
119
    } else {
×
120
        let mut client = FilmsClient::new(channel);
2✔
121
        let res = client.count(Request::new(request.clone())).await?;
4✔
122
        let count_response = res.into_inner();
2✔
123
        let res = client.query(Request::new(request)).await?;
4✔
124
        let query_response = res.into_inner();
2✔
125
        _ = sender_shutdown_internal.send(());
2✔
126
        Ok((count_response, query_response))
2✔
127
    }
×
128
}
4✔
129

×
130
#[tokio::test]
1✔
131
async fn test_grpc_query() {
1✔
132
    // create filter expression
1✔
133
    let filter = FilterExpression::Simple(
1✔
134
        "film_id".to_string(),
1✔
135
        dozer_cache::cache::expression::Operator::EQ,
1✔
136
        dozer_types::serde_json::Value::from(524),
1✔
137
    );
1✔
138

1✔
139
    let query = QueryExpression {
1✔
140
        filter: Some(filter),
1✔
141
        ..QueryExpression::default()
1✔
142
    };
1✔
143
    let request = QueryFilmsRequest {
1✔
144
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
145
    };
1✔
146

147
    let (count_response, query_response) =
1✔
148
        test_grpc_count_and_query_common(1402, request, None, None)
1✔
149
            .await
7✔
150
            .unwrap();
1✔
151
    assert_eq!(count_response.count, query_response.records.len() as u64);
1✔
152
    assert!(!query_response.records.len() > 0);
1✔
153
}
1✔
154

×
155
#[tokio::test]
1✔
156
async fn test_grpc_query_with_access_token() {
1✔
157
    // create filter expression
1✔
158
    let filter = FilterExpression::Simple(
1✔
159
        "film_id".to_string(),
1✔
160
        dozer_cache::cache::expression::Operator::EQ,
1✔
161
        dozer_types::serde_json::Value::from(524),
1✔
162
    );
1✔
163

1✔
164
    let query = QueryExpression {
1✔
165
        filter: Some(filter),
1✔
166
        ..QueryExpression::default()
1✔
167
    };
1✔
168
    let request = QueryFilmsRequest {
1✔
169
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
170
    };
1✔
171
    let api_security = ApiSecurity::Jwt("DXkzrlnTy6".to_owned());
1✔
172
    let authorizer = Authorizer::from(&api_security);
1✔
173
    let generated_token = authorizer.generate_token(Access::All, None).unwrap();
1✔
174
    let (count_response, query_response) =
1✔
175
        test_grpc_count_and_query_common(1403, request, Some(api_security), Some(generated_token))
1✔
176
            .await
7✔
177
            .unwrap();
1✔
178
    assert_eq!(count_response.count, query_response.records.len() as u64);
1✔
179
    assert!(!query_response.records.is_empty());
1✔
180
}
1✔
181

×
182
#[tokio::test]
1✔
183
async fn test_grpc_query_with_wrong_access_token() {
1✔
184
    // create filter expression
1✔
185
    let filter = FilterExpression::Simple(
1✔
186
        "film_id".to_string(),
1✔
187
        dozer_cache::cache::expression::Operator::EQ,
1✔
188
        dozer_types::serde_json::Value::from(524),
1✔
189
    );
1✔
190

1✔
191
    let query = QueryExpression {
1✔
192
        filter: Some(filter),
1✔
193
        ..QueryExpression::default()
1✔
194
    };
1✔
195
    let request = QueryFilmsRequest {
1✔
196
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
197
    };
1✔
198
    let api_security = ApiSecurity::Jwt("DXkzrlnTy6".to_owned());
1✔
199
    let generated_token = "wrongrandomtoken".to_owned();
1✔
200
    let request_response =
1✔
201
        test_grpc_count_and_query_common(1404, request, Some(api_security), Some(generated_token))
1✔
202
            .await;
5✔
203
    assert!(request_response.is_err());
1✔
204
    assert!(request_response.unwrap_err().code() == Code::PermissionDenied);
1✔
205
}
1✔
206

×
207
#[tokio::test]
1✔
208
async fn test_grpc_query_empty_body() {
1✔
209
    let request = QueryFilmsRequest { query: None };
1✔
210

×
211
    let (count_response, query_response) =
1✔
212
        test_grpc_count_and_query_common(1405, request, None, None)
1✔
213
            .await
7✔
214
            .unwrap();
1✔
215
    assert_eq!(count_response.count, 52);
1✔
216
    assert_eq!(query_response.records.len(), 50);
1✔
217
}
1✔
218

×
219
#[tokio::test]
1✔
220
async fn test_typed_streaming1() {
1✔
221
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
222
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
1✔
223
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
224
        default_pipeline_internal.host,
1✔
225
        default_pipeline_internal.port,
1✔
226
        rx_internal,
1✔
227
    ));
1✔
228
    let (_tx, rx) = oneshot::channel::<()>();
1✔
229
    let _jh = tokio::spawn(async move {
1✔
230
        let typed_service = setup_typed_service(None);
1✔
231
        Server::builder()
1✔
232
            .add_service(typed_service)
1✔
233
            .serve_with_shutdown("127.0.0.1:14321".parse().unwrap(), rx.map(drop))
1✔
234
            .await
1✔
235
            .unwrap();
×
236
    });
1✔
237
    tokio::time::sleep(Duration::from_millis(1001)).await;
1✔
238
    let address = "http://127.0.0.1:14321".to_owned();
1✔
239
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
240

1✔
241
    let request = FilmEventRequest {
1✔
242
        r#type: EventType::All as i32,
1✔
243
        filter: None,
1✔
244
    };
1✔
245
    let stream = client
1✔
246
        .on_event(Request::new(request))
1✔
247
        .await
2✔
248
        .unwrap()
1✔
249
        .into_inner();
1✔
250
    let mut stream = stream.take(1);
1✔
251
    while let Some(item) = stream.next().await {
2✔
252
        let response: FilmEvent = item.unwrap();
1✔
253
        assert!(response.new.is_some());
1✔
254
    }
×
255
    _ = sender_shutdown_internal.send(());
1✔
256
}
1✔
257

×
258
#[tokio::test]
1✔
259
async fn test_typed_streaming2() {
1✔
260
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
261
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
1✔
262
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
263
        default_pipeline_internal.host,
1✔
264
        default_pipeline_internal.port,
1✔
265
        rx_internal,
1✔
266
    ));
1✔
267
    let (_tx, rx) = oneshot::channel::<()>();
1✔
268
    let _jh = tokio::spawn(async move {
1✔
269
        let typed_service = setup_typed_service(None);
1✔
270
        Server::builder()
1✔
271
            .add_service(typed_service)
1✔
272
            .serve_with_shutdown("127.0.0.1:14322".parse().unwrap(), rx.map(drop))
1✔
273
            .await
1✔
274
            .unwrap();
×
275
    });
1✔
276
    tokio::time::sleep(Duration::from_millis(1001)).await;
1✔
277
    let address = "http://127.0.0.1:14322".to_owned();
1✔
278
    let request = FilmEventRequest {
1✔
279
        r#type: EventType::All as i32,
1✔
280
        filter: Some(r#"{ "film_id": 32 }"#.into()),
1✔
281
    };
1✔
282
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
283
    let stream = client
1✔
284
        .on_event(Request::new(request))
1✔
285
        .await
2✔
286
        .unwrap()
1✔
287
        .into_inner();
1✔
288
    let mut stream = stream.take(1);
1✔
289
    while let Some(item) = stream.next().await {
1✔
290
        let response: FilmEvent = item.unwrap();
×
291
        assert!(response.new.is_some());
×
292
    }
×
293
    _ = sender_shutdown_internal.send(());
1✔
294
}
1✔
295

×
296
#[tokio::test]
1✔
297
async fn test_typed_streaming3() {
1✔
298
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
299
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
1✔
300
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
301
        default_pipeline_internal.host,
1✔
302
        default_pipeline_internal.port,
1✔
303
        rx_internal,
1✔
304
    ));
1✔
305
    let (_tx, rx) = oneshot::channel::<()>();
1✔
306
    let _jh = tokio::spawn(async move {
1✔
307
        let typed_service = setup_typed_service(None);
1✔
308
        Server::builder()
1✔
309
            .add_service(typed_service)
1✔
310
            .serve_with_shutdown("127.0.0.1:14323".parse().unwrap(), rx.map(drop))
1✔
311
            .await
1✔
312
            .unwrap();
×
313
    });
1✔
314
    tokio::time::sleep(Duration::from_millis(100)).await;
1✔
315
    let address = "http://127.0.0.1:14323".to_owned();
1✔
316
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
317
    let request = FilmEventRequest {
1✔
318
        r#type: EventType::All as i32,
1✔
319
        filter: Some(r#"{ "film_id": 0 }"#.into()),
1✔
320
    };
1✔
321
    let mut stream = client
1✔
322
        .on_event(Request::new(request))
1✔
323
        .await
2✔
324
        .unwrap()
1✔
325
        .into_inner();
1✔
326
    let error_timeout = timeout(Duration::from_secs(1), stream.next()).await;
1✔
327
    assert!(error_timeout.is_err() || error_timeout.unwrap().is_none());
1✔
328
    _ = sender_shutdown_internal.send(());
1✔
329
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc