• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3974044832

pending completion
3974044832

Pull #703

github

GitHub
Merge 6f30ce3b1 into 9fa836726
Pull Request #703: fix: forbid duplicated cte names

27 of 27 new or added lines in 1 file covered. (100.0%)

22298 of 33673 relevant lines covered (66.22%)

35756.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.93
/dozer-api/src/grpc/typed/tests/service.rs
1
use crate::{
2
    auth::{Access, Authorizer},
3
    generator::protoc::utils::get_proto_descriptor,
4
    grpc::{
5
        auth_middleware::AuthMiddlewareLayer,
6
        client_server::ApiServer,
7
        internal_grpc::PipelineResponse,
8
        typed::{
9
            tests::{
10
                fake_internal_pipeline_server::start_fake_internal_grpc_pipeline,
11
                generated::films::{
12
                    films_client::FilmsClient, CountFilmsResponse, FilmEvent, QueryFilmsRequest,
13
                    QueryFilmsResponse,
14
                },
15
            },
16
            TypedService,
17
        },
18
    },
19
    CacheEndpoint, PipelineDetails,
20
};
21
use dozer_cache::cache::expression::{FilterExpression, QueryExpression};
22
use dozer_types::{
23
    models::{api_config::default_api_config, api_security::ApiSecurity},
24
    types::Schema,
25
};
26
use futures_util::FutureExt;
27
use std::{collections::HashMap, env, path::PathBuf, str::FromStr, time::Duration};
28

29
use super::{generated::films::FilmEventRequest, types::EventType};
30
use crate::test_utils;
31
use tokio::{
32
    sync::{
33
        broadcast::{self, Receiver},
34
        oneshot,
35
    },
36
    time::timeout,
37
};
38
use tokio_stream::StreamExt;
39
use tonic::{
40
    metadata::MetadataValue,
41
    transport::{Endpoint, Server},
42
    Code, Request,
43
};
44

45
pub fn setup_pipeline() -> (
11✔
46
    HashMap<String, PipelineDetails>,
11✔
47
    HashMap<String, Schema>,
11✔
48
    Receiver<PipelineResponse>,
11✔
49
) {
11✔
50
    let schema_name = String::from("films");
11✔
51
    let (schema, _) = test_utils::get_schema();
11✔
52
    let endpoint = test_utils::get_endpoint();
11✔
53
    let pipeline_details = PipelineDetails {
11✔
54
        schema_name: schema_name.clone(),
11✔
55
        cache_endpoint: CacheEndpoint {
11✔
56
            cache: test_utils::initialize_cache(&schema_name, None),
11✔
57
            endpoint,
11✔
58
        },
11✔
59
    };
11✔
60

11✔
61
    let (tx, rx1) = broadcast::channel::<PipelineResponse>(16);
11✔
62
    let default_api_internal = default_api_config().pipeline_internal.unwrap_or_default();
11✔
63
    tokio::spawn(async {
11✔
64
        ApiServer::setup_broad_cast_channel(tx, default_api_internal)
8✔
65
            .await
48✔
66
            .unwrap();
4✔
67
    });
11✔
68
    let mut pipeline_map = HashMap::new();
11✔
69
    pipeline_map.insert("films".to_string(), pipeline_details);
11✔
70

11✔
71
    let mut schema_map = HashMap::new();
11✔
72
    schema_map.insert("films".to_string(), schema);
11✔
73

11✔
74
    (pipeline_map, schema_map, rx1)
11✔
75
}
11✔
76

77
fn setup_typed_service(security: Option<ApiSecurity>) -> TypedService {
7✔
78
    let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
7✔
79

7✔
80
    let path = out_dir.join("generated_films.bin");
7✔
81

7✔
82
    let (_, desc) = get_proto_descriptor(&path).unwrap();
7✔
83

7✔
84
    let (pipeline_map, schema_map, rx1) = setup_pipeline();
7✔
85

7✔
86
    TypedService::new(desc, pipeline_map, schema_map, Some(rx1), security)
7✔
87
}
7✔
88

89
async fn test_grpc_count_and_query_common(
4✔
90
    port: u32,
4✔
91
    request: QueryFilmsRequest,
4✔
92
    api_security: Option<ApiSecurity>,
4✔
93
    access_token: Option<String>,
4✔
94
) -> Result<(CountFilmsResponse, QueryFilmsResponse), tonic::Status> {
4✔
95
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
4✔
96
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
4✔
97
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
4✔
98
        default_pipeline_internal.host,
4✔
99
        default_pipeline_internal.port,
4✔
100
        rx_internal,
4✔
101
    ));
4✔
102

4✔
103
    let typed_service = setup_typed_service(api_security.to_owned());
4✔
104
    let (_tx, rx) = oneshot::channel::<()>();
4✔
105
    // middleware
4✔
106
    let layer = tower::ServiceBuilder::new()
4✔
107
        .layer(AuthMiddlewareLayer::new(api_security.to_owned()))
4✔
108
        .into_inner();
4✔
109
    let _jh = tokio::spawn(async move {
4✔
110
        Server::builder()
4✔
111
            .layer(layer)
4✔
112
            .add_service(typed_service)
4✔
113
            .serve_with_shutdown(
4✔
114
                format!("127.0.0.1:{:}", port).parse().unwrap(),
4✔
115
                rx.map(drop),
4✔
116
            )
4✔
117
            .await
4✔
118
            .unwrap();
×
119
    });
4✔
120
    tokio::time::sleep(Duration::from_millis(100)).await;
4✔
121
    let channel = Endpoint::from_str(&format!("http://127.0.0.1:{:}", port))
4✔
122
        .unwrap()
4✔
123
        .connect()
4✔
124
        .await
8✔
125
        .unwrap();
4✔
126
    if api_security.is_some() {
4✔
127
        let my_token = access_token.unwrap_or_default();
2✔
128
        let mut client = FilmsClient::with_interceptor(channel, move |mut req: Request<()>| {
3✔
129
            let token: MetadataValue<_> = format!("Bearer {:}", my_token).parse().unwrap();
3✔
130
            req.metadata_mut().insert("authorization", token);
3✔
131
            Ok(req)
3✔
132
        });
3✔
133
        let res = client.count(Request::new(request.clone())).await?;
4✔
134
        let count_response = res.into_inner();
1✔
135
        let res = client.query(Request::new(request)).await?;
2✔
136
        let query_response = res.into_inner();
1✔
137
        _ = sender_shutdown_internal.send(());
1✔
138
        Ok((count_response, query_response))
1✔
139
    } else {
140
        let mut client = FilmsClient::new(channel);
2✔
141
        let res = client.count(Request::new(request.clone())).await?;
4✔
142
        let count_response = res.into_inner();
2✔
143
        let res = client.query(Request::new(request)).await?;
4✔
144
        let query_response = res.into_inner();
2✔
145
        _ = sender_shutdown_internal.send(());
2✔
146
        Ok((count_response, query_response))
2✔
147
    }
148
}
4✔
149

150
#[tokio::test]
1✔
151
async fn test_grpc_query() {
1✔
152
    // create filter expression
1✔
153
    let filter = FilterExpression::Simple(
1✔
154
        "film_id".to_string(),
1✔
155
        dozer_cache::cache::expression::Operator::EQ,
1✔
156
        dozer_types::serde_json::Value::from(524),
1✔
157
    );
1✔
158

1✔
159
    let query = QueryExpression {
1✔
160
        filter: Some(filter),
1✔
161
        ..QueryExpression::default()
1✔
162
    };
1✔
163
    let request = QueryFilmsRequest {
1✔
164
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
165
    };
1✔
166

167
    let (count_response, query_response) =
1✔
168
        test_grpc_count_and_query_common(1402, request, None, None)
1✔
169
            .await
7✔
170
            .unwrap();
1✔
171
    assert_eq!(count_response.count, query_response.data.len() as u64);
1✔
172
    assert!(!query_response.data.len() > 0);
1✔
173
}
1✔
174

175
#[tokio::test]
1✔
176
async fn test_grpc_query_with_access_token() {
1✔
177
    // create filter expression
1✔
178
    let filter = FilterExpression::Simple(
1✔
179
        "film_id".to_string(),
1✔
180
        dozer_cache::cache::expression::Operator::EQ,
1✔
181
        dozer_types::serde_json::Value::from(524),
1✔
182
    );
1✔
183

1✔
184
    let query = QueryExpression {
1✔
185
        filter: Some(filter),
1✔
186
        ..QueryExpression::default()
1✔
187
    };
1✔
188
    let request = QueryFilmsRequest {
1✔
189
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
190
    };
1✔
191
    let api_security = ApiSecurity::Jwt("DXkzrlnTy6".to_owned());
1✔
192
    let authorizer = Authorizer::from(&api_security);
1✔
193
    let generated_token = authorizer.generate_token(Access::All, None).unwrap();
1✔
194
    let (count_response, query_response) =
1✔
195
        test_grpc_count_and_query_common(1403, request, Some(api_security), Some(generated_token))
1✔
196
            .await
7✔
197
            .unwrap();
1✔
198
    assert_eq!(count_response.count, query_response.data.len() as u64);
1✔
199
    assert!(!query_response.data.is_empty());
1✔
200
}
1✔
201

202
#[tokio::test]
1✔
203
async fn test_grpc_query_with_wrong_access_token() {
1✔
204
    // create filter expression
1✔
205
    let filter = FilterExpression::Simple(
1✔
206
        "film_id".to_string(),
1✔
207
        dozer_cache::cache::expression::Operator::EQ,
1✔
208
        dozer_types::serde_json::Value::from(524),
1✔
209
    );
1✔
210

1✔
211
    let query = QueryExpression {
1✔
212
        filter: Some(filter),
1✔
213
        ..QueryExpression::default()
1✔
214
    };
1✔
215
    let request = QueryFilmsRequest {
1✔
216
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
217
    };
1✔
218
    let api_security = ApiSecurity::Jwt("DXkzrlnTy6".to_owned());
1✔
219
    let generated_token = "wrongrandomtoken".to_owned();
1✔
220
    let request_response =
1✔
221
        test_grpc_count_and_query_common(1404, request, Some(api_security), Some(generated_token))
1✔
222
            .await;
5✔
223
    assert!(request_response.is_err());
1✔
224
    assert!(request_response.unwrap_err().code() == Code::PermissionDenied);
1✔
225
}
1✔
226

227
#[tokio::test]
1✔
228
async fn test_grpc_query_empty_body() {
1✔
229
    let request = QueryFilmsRequest { query: None };
1✔
230

231
    let (count_response, query_response) =
1✔
232
        test_grpc_count_and_query_common(1405, request, None, None)
1✔
233
            .await
7✔
234
            .unwrap();
1✔
235
    assert_eq!(count_response.count, 52);
1✔
236
    assert_eq!(query_response.data.len(), 50);
1✔
237
}
1✔
238

239
#[tokio::test]
1✔
240
async fn test_typed_streaming1() {
1✔
241
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
242
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
1✔
243
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
244
        default_pipeline_internal.host,
1✔
245
        default_pipeline_internal.port,
1✔
246
        rx_internal,
1✔
247
    ));
1✔
248
    let (_tx, rx) = oneshot::channel::<()>();
1✔
249
    let _jh = tokio::spawn(async move {
1✔
250
        let typed_service = setup_typed_service(None);
1✔
251
        Server::builder()
1✔
252
            .add_service(typed_service)
1✔
253
            .serve_with_shutdown("127.0.0.1:14321".parse().unwrap(), rx.map(drop))
1✔
254
            .await
1✔
255
            .unwrap();
×
256
    });
1✔
257
    tokio::time::sleep(Duration::from_millis(1001)).await;
1✔
258
    let address = "http://127.0.0.1:14321".to_owned();
1✔
259
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
260

1✔
261
    let request = FilmEventRequest {
1✔
262
        r#type: EventType::All as i32,
1✔
263
        filter: None,
1✔
264
    };
1✔
265
    let stream = client
1✔
266
        .on_event(Request::new(request))
1✔
267
        .await
2✔
268
        .unwrap()
1✔
269
        .into_inner();
1✔
270
    let mut stream = stream.take(1);
1✔
271
    while let Some(item) = stream.next().await {
1✔
272
        let response: FilmEvent = item.unwrap();
×
273
        assert!(response.new.is_some());
×
274
    }
275
    _ = sender_shutdown_internal.send(());
1✔
276
}
1✔
277

278
#[tokio::test]
1✔
279
async fn test_typed_streaming2() {
1✔
280
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
281
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
1✔
282
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
283
        default_pipeline_internal.host,
1✔
284
        default_pipeline_internal.port,
1✔
285
        rx_internal,
1✔
286
    ));
1✔
287
    let (_tx, rx) = oneshot::channel::<()>();
1✔
288
    let _jh = tokio::spawn(async move {
1✔
289
        let typed_service = setup_typed_service(None);
1✔
290
        Server::builder()
1✔
291
            .add_service(typed_service)
1✔
292
            .serve_with_shutdown("127.0.0.1:14322".parse().unwrap(), rx.map(drop))
1✔
293
            .await
1✔
294
            .unwrap();
×
295
    });
1✔
296
    tokio::time::sleep(Duration::from_millis(1001)).await;
1✔
297
    let address = "http://127.0.0.1:14322".to_owned();
1✔
298
    let request = FilmEventRequest {
1✔
299
        r#type: EventType::All as i32,
1✔
300
        filter: Some(r#"{ "film_id": 32 }"#.into()),
1✔
301
    };
1✔
302
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
303
    let stream = client
1✔
304
        .on_event(Request::new(request))
1✔
305
        .await
2✔
306
        .unwrap()
1✔
307
        .into_inner();
1✔
308
    let mut stream = stream.take(1);
1✔
309
    while let Some(item) = stream.next().await {
2✔
310
        let response: FilmEvent = item.unwrap();
1✔
311
        assert!(response.new.is_some());
1✔
312
    }
313
    _ = sender_shutdown_internal.send(());
1✔
314
}
1✔
315

316
#[tokio::test]
1✔
317
async fn test_typed_streaming3() {
1✔
318
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
319
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
1✔
320
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
321
        default_pipeline_internal.host,
1✔
322
        default_pipeline_internal.port,
1✔
323
        rx_internal,
1✔
324
    ));
1✔
325
    let (_tx, rx) = oneshot::channel::<()>();
1✔
326
    let _jh = tokio::spawn(async move {
1✔
327
        let typed_service = setup_typed_service(None);
1✔
328
        Server::builder()
1✔
329
            .add_service(typed_service)
1✔
330
            .serve_with_shutdown("127.0.0.1:14323".parse().unwrap(), rx.map(drop))
1✔
331
            .await
1✔
332
            .unwrap();
×
333
    });
1✔
334
    tokio::time::sleep(Duration::from_millis(100)).await;
1✔
335
    let address = "http://127.0.0.1:14323".to_owned();
1✔
336
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
337
    let request = FilmEventRequest {
1✔
338
        r#type: EventType::All as i32,
1✔
339
        filter: Some(r#"{ "film_id": 0 }"#.into()),
1✔
340
    };
1✔
341
    let mut stream = client
1✔
342
        .on_event(Request::new(request))
1✔
343
        .await
2✔
344
        .unwrap()
1✔
345
        .into_inner();
1✔
346
    let error_timeout = timeout(Duration::from_secs(1), stream.next()).await;
1✔
347
    assert!(error_timeout.is_err() || error_timeout.unwrap().is_none());
1✔
348
    _ = sender_shutdown_internal.send(());
1✔
349
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc