• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4829268814

pending completion
4829268814

Pull #1516

github

GitHub
Merge 845b68ec1 into f2ab0e6ce
Pull Request #1516: Prepare v0.1.19

35090 of 44737 relevant lines covered (78.44%)

11496.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.98
/dozer-api/src/grpc/typed/tests/service.rs
1
use crate::{
2
    auth::{Access, Authorizer},
3
    errors::GrpcError,
4
    grpc::{
5
        auth_middleware::AuthMiddlewareLayer,
6
        internal::internal_pipeline_client::InternalPipelineClient,
7
        typed::{
8
            tests::fake_internal_pipeline_server::start_fake_internal_grpc_pipeline, TypedService,
9
        },
10
    },
11
    CacheEndpoint,
12
};
13
use dozer_cache::cache::expression::{FilterExpression, QueryExpression};
14
use dozer_types::grpc_types::{
15
    generated::films::FilmEventRequest,
16
    generated::films::{
17
        films_client::FilmsClient, CountFilmsResponse, FilmEvent, QueryFilmsRequest,
18
        QueryFilmsResponse,
19
    },
20
    types::{EventType, Operation},
21
};
22
use dozer_types::models::{api_config::default_api_config, api_security::ApiSecurity};
23
use futures_util::FutureExt;
24
use std::{env, str::FromStr, sync::Arc, time::Duration};
25

26
use crate::test_utils;
27
use tokio::{
28
    sync::{
29
        broadcast::{self, Receiver},
30
        oneshot,
31
    },
32
    time::timeout,
33
};
34
use tokio_stream::StreamExt;
35
use tonic::{
36
    metadata::MetadataValue,
37
    transport::{Endpoint, Server},
38
    Code, Request,
39
};
40

41
async fn start_internal_pipeline_client() -> Result<Receiver<Operation>, GrpcError> {
11✔
42
    let default_api_internal = default_api_config().app_grpc.unwrap_or_default();
11✔
43
    let mut client = InternalPipelineClient::new(&default_api_internal).await?;
22✔
44
    let (receiver, future) = client.stream_operations().await?;
22✔
45
    tokio::spawn(future);
11✔
46
    Ok(receiver)
11✔
47
}
11✔
48

49
pub async fn setup_pipeline() -> (Vec<Arc<CacheEndpoint>>, Receiver<Operation>) {
11✔
50
    // Copy this file from dozer-tests output directory if it changes
11✔
51
    let res = env::current_dir().unwrap();
11✔
52
    let descriptor_path = res.join("src/grpc/typed/tests/generated_films.bin");
11✔
53
    let endpoint = test_utils::get_endpoint();
11✔
54
    let cache_endpoint = CacheEndpoint::open(
11✔
55
        &*test_utils::initialize_cache(&endpoint.name, None),
11✔
56
        descriptor_path,
11✔
57
        endpoint,
11✔
58
    )
11✔
59
    .unwrap();
11✔
60

61
    let receiver = start_internal_pipeline_client()
11✔
62
        .await
44✔
63
        .unwrap_or(broadcast::channel::<Operation>(1).1);
11✔
64

11✔
65
    (vec![Arc::new(cache_endpoint)], receiver)
11✔
66
}
11✔
67

68
async fn setup_typed_service(security: Option<ApiSecurity>) -> TypedService {
7✔
69
    let (endpoints, rx1) = setup_pipeline().await;
28✔
70

71
    TypedService::new(endpoints, Some(rx1), security).unwrap()
7✔
72
}
7✔
73

74
async fn test_grpc_count_and_query_common(
4✔
75
    port: u32,
4✔
76
    request: QueryFilmsRequest,
4✔
77
    api_security: Option<ApiSecurity>,
4✔
78
    access_token: Option<String>,
4✔
79
) -> Result<(CountFilmsResponse, QueryFilmsResponse), tonic::Status> {
4✔
80
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
4✔
81
    let default_pipeline_internal = default_api_config().app_grpc.unwrap_or_default();
4✔
82
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
4✔
83
        default_pipeline_internal.host,
4✔
84
        default_pipeline_internal.port,
4✔
85
        rx_internal,
4✔
86
    ));
4✔
87

88
    let typed_service = setup_typed_service(api_security.to_owned()).await;
16✔
89
    let (_tx, rx) = oneshot::channel::<()>();
4✔
90
    // middleware
4✔
91
    let layer = tower::ServiceBuilder::new()
4✔
92
        .layer(AuthMiddlewareLayer::new(api_security.to_owned()))
4✔
93
        .into_inner();
4✔
94
    let _jh = tokio::spawn(async move {
4✔
95
        Server::builder()
4✔
96
            .layer(layer)
4✔
97
            .add_service(typed_service)
4✔
98
            .serve_with_shutdown(format!("127.0.0.1:{port:}").parse().unwrap(), rx.map(drop))
4✔
99
            .await
4✔
100
            .unwrap();
×
101
    });
4✔
102
    tokio::time::sleep(Duration::from_millis(1001)).await;
4✔
103
    let channel = Endpoint::from_str(&format!("http://127.0.0.1:{port:}"))
4✔
104
        .unwrap()
4✔
105
        .connect()
4✔
106
        .await
8✔
107
        .unwrap();
4✔
108
    if api_security.is_some() {
4✔
109
        let my_token = access_token.unwrap_or_default();
2✔
110
        let mut client = FilmsClient::with_interceptor(channel, move |mut req: Request<()>| {
3✔
111
            let token: MetadataValue<_> = format!("Bearer {my_token:}").parse().unwrap();
3✔
112
            req.metadata_mut().insert("authorization", token);
3✔
113
            Ok(req)
3✔
114
        });
3✔
115
        let res = client.count(Request::new(request.clone())).await?;
4✔
116
        let count_response = res.into_inner();
1✔
117
        let res = client.query(Request::new(request)).await?;
2✔
118
        let query_response = res.into_inner();
1✔
119
        _ = sender_shutdown_internal.send(());
1✔
120
        Ok((count_response, query_response))
1✔
121
    } else {
122
        let mut client = FilmsClient::new(channel);
2✔
123
        let res = client.count(Request::new(request.clone())).await?;
4✔
124
        let count_response = res.into_inner();
2✔
125
        let res = client.query(Request::new(request)).await?;
4✔
126
        let query_response = res.into_inner();
2✔
127
        _ = sender_shutdown_internal.send(());
2✔
128
        Ok((count_response, query_response))
2✔
129
    }
130
}
4✔
131

132
#[tokio::test]
1✔
133
async fn test_grpc_query() {
1✔
134
    // create filter expression
1✔
135
    let filter = FilterExpression::Simple(
1✔
136
        "film_id".to_string(),
1✔
137
        dozer_cache::cache::expression::Operator::EQ,
1✔
138
        dozer_types::serde_json::Value::from(524),
1✔
139
    );
1✔
140

1✔
141
    let query = QueryExpression {
1✔
142
        filter: Some(filter),
1✔
143
        ..QueryExpression::default()
1✔
144
    };
1✔
145
    let request = QueryFilmsRequest {
1✔
146
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
147
    };
1✔
148

149
    let (count_response, query_response) =
1✔
150
        test_grpc_count_and_query_common(1402, request, None, None)
1✔
151
            .await
11✔
152
            .unwrap();
1✔
153
    assert_eq!(count_response.count, query_response.records.len() as u64);
1✔
154
    assert!(!query_response.records.len() > 0);
1✔
155
}
156

157
#[tokio::test]
1✔
158
async fn test_grpc_query_with_access_token() {
1✔
159
    // create filter expression
1✔
160
    let filter = FilterExpression::Simple(
1✔
161
        "film_id".to_string(),
1✔
162
        dozer_cache::cache::expression::Operator::EQ,
1✔
163
        dozer_types::serde_json::Value::from(524),
1✔
164
    );
1✔
165

1✔
166
    let query = QueryExpression {
1✔
167
        filter: Some(filter),
1✔
168
        ..QueryExpression::default()
1✔
169
    };
1✔
170
    let request = QueryFilmsRequest {
1✔
171
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
172
    };
1✔
173
    let api_security = ApiSecurity::Jwt("DXkzrlnTy6".to_owned());
1✔
174
    let authorizer = Authorizer::from(&api_security);
1✔
175
    let generated_token = authorizer.generate_token(Access::All, None).unwrap();
1✔
176
    let (count_response, query_response) =
1✔
177
        test_grpc_count_and_query_common(1403, request, Some(api_security), Some(generated_token))
1✔
178
            .await
11✔
179
            .unwrap();
1✔
180
    assert_eq!(count_response.count, query_response.records.len() as u64);
1✔
181
    assert!(!query_response.records.is_empty());
1✔
182
}
183

184
#[tokio::test]
1✔
185
async fn test_grpc_query_with_wrong_access_token() {
1✔
186
    // create filter expression
1✔
187
    let filter = FilterExpression::Simple(
1✔
188
        "film_id".to_string(),
1✔
189
        dozer_cache::cache::expression::Operator::EQ,
1✔
190
        dozer_types::serde_json::Value::from(524),
1✔
191
    );
1✔
192

1✔
193
    let query = QueryExpression {
1✔
194
        filter: Some(filter),
1✔
195
        ..QueryExpression::default()
1✔
196
    };
1✔
197
    let request = QueryFilmsRequest {
1✔
198
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
199
    };
1✔
200
    let api_security = ApiSecurity::Jwt("DXkzrlnTy6".to_owned());
1✔
201
    let generated_token = "wrongrandomtoken".to_owned();
1✔
202
    let request_response =
1✔
203
        test_grpc_count_and_query_common(1404, request, Some(api_security), Some(generated_token))
1✔
204
            .await;
9✔
205
    assert!(request_response.is_err());
1✔
206
    assert!(request_response.unwrap_err().code() == Code::PermissionDenied);
1✔
207
}
208

209
#[tokio::test]
1✔
210
async fn test_grpc_query_empty_body() {
1✔
211
    let request = QueryFilmsRequest { query: None };
1✔
212

213
    let (count_response, query_response) =
1✔
214
        test_grpc_count_and_query_common(1405, request, None, None)
1✔
215
            .await
11✔
216
            .unwrap();
1✔
217
    assert_eq!(count_response.count, 52);
1✔
218
    assert_eq!(query_response.records.len(), 50);
1✔
219
}
220

221
#[tokio::test]
1✔
222
async fn test_typed_streaming1() {
1✔
223
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
224
    let default_pipeline_internal = default_api_config().app_grpc.unwrap_or_default();
1✔
225
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
226
        default_pipeline_internal.host,
1✔
227
        default_pipeline_internal.port,
1✔
228
        rx_internal,
1✔
229
    ));
1✔
230
    let (_tx, rx) = oneshot::channel::<()>();
1✔
231
    let _jh = tokio::spawn(async move {
1✔
232
        let typed_service = setup_typed_service(None).await;
4✔
233
        Server::builder()
1✔
234
            .add_service(typed_service)
1✔
235
            .serve_with_shutdown("127.0.0.1:14321".parse().unwrap(), rx.map(drop))
1✔
236
            .await
1✔
237
            .unwrap();
×
238
    });
1✔
239
    tokio::time::sleep(Duration::from_millis(1001)).await;
1✔
240
    let address = "http://127.0.0.1:14321".to_owned();
1✔
241
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
242

1✔
243
    let request = FilmEventRequest {
1✔
244
        r#type: EventType::All as i32,
1✔
245
        filter: None,
1✔
246
    };
1✔
247
    let stream = client
1✔
248
        .on_event(Request::new(request))
1✔
249
        .await
2✔
250
        .unwrap()
1✔
251
        .into_inner();
1✔
252
    let mut stream = stream.take(1);
1✔
253
    while let Some(item) = stream.next().await {
1✔
254
        let response: FilmEvent = item.unwrap();
×
255
        assert!(response.new.is_some());
×
256
    }
257
    _ = sender_shutdown_internal.send(());
1✔
258
}
259

260
#[tokio::test]
1✔
261
async fn test_typed_streaming2() {
1✔
262
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
263
    let default_pipeline_internal = default_api_config().app_grpc.unwrap_or_default();
1✔
264
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
265
        default_pipeline_internal.host,
1✔
266
        default_pipeline_internal.port,
1✔
267
        rx_internal,
1✔
268
    ));
1✔
269
    let (_tx, rx) = oneshot::channel::<()>();
1✔
270
    let _jh = tokio::spawn(async move {
1✔
271
        let typed_service = setup_typed_service(None).await;
4✔
272
        Server::builder()
1✔
273
            .add_service(typed_service)
1✔
274
            .serve_with_shutdown("127.0.0.1:14322".parse().unwrap(), rx.map(drop))
1✔
275
            .await
1✔
276
            .unwrap();
×
277
    });
1✔
278
    tokio::time::sleep(Duration::from_millis(1001)).await;
1✔
279
    let address = "http://127.0.0.1:14322".to_owned();
1✔
280
    let request = FilmEventRequest {
1✔
281
        r#type: EventType::All as i32,
1✔
282
        filter: Some(r#"{ "film_id": 32 }"#.into()),
1✔
283
    };
1✔
284
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
285
    let stream = client
1✔
286
        .on_event(Request::new(request))
1✔
287
        .await
2✔
288
        .unwrap()
1✔
289
        .into_inner();
1✔
290
    let mut stream = stream.take(1);
1✔
291
    while let Some(item) = stream.next().await {
1✔
292
        let response: FilmEvent = item.unwrap();
×
293
        assert!(response.new.is_some());
×
294
    }
295
    _ = sender_shutdown_internal.send(());
1✔
296
}
297

298
#[tokio::test]
1✔
299
async fn test_typed_streaming3() {
1✔
300
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
301
    let default_pipeline_internal = default_api_config().app_grpc.unwrap_or_default();
1✔
302
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
303
        default_pipeline_internal.host,
1✔
304
        default_pipeline_internal.port,
1✔
305
        rx_internal,
1✔
306
    ));
1✔
307
    let (_tx, rx) = oneshot::channel::<()>();
1✔
308
    let _jh = tokio::spawn(async move {
1✔
309
        let typed_service = setup_typed_service(None).await;
4✔
310
        Server::builder()
1✔
311
            .add_service(typed_service)
1✔
312
            .serve_with_shutdown("127.0.0.1:14323".parse().unwrap(), rx.map(drop))
1✔
313
            .await
1✔
314
            .unwrap();
×
315
    });
1✔
316
    tokio::time::sleep(Duration::from_millis(1001)).await;
1✔
317
    let address = "http://127.0.0.1:14323".to_owned();
1✔
318
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
319
    let request = FilmEventRequest {
1✔
320
        r#type: EventType::All as i32,
1✔
321
        filter: Some(r#"{ "film_id": 0 }"#.into()),
1✔
322
    };
1✔
323
    let mut stream = client
1✔
324
        .on_event(Request::new(request))
1✔
325
        .await
2✔
326
        .unwrap()
1✔
327
        .into_inner();
1✔
328
    let error_timeout = timeout(Duration::from_secs(1), stream.next()).await;
1✔
329
    assert!(error_timeout.is_err() || error_timeout.unwrap().is_none());
1✔
330
    _ = sender_shutdown_internal.send(());
1✔
331
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc