• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075746792

pending completion
4075746792

Pull #790

github

GitHub
Merge 75b5f293b into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

393 of 393 new or added lines in 12 files covered. (100.0%)

24721 of 36724 relevant lines covered (67.32%)

55788.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.91
/dozer-api/src/grpc/typed/tests/service.rs
1
use crate::{
2
    auth::{Access, Authorizer},
3
    generator::protoc::utils::get_proto_descriptor,
4
    grpc::{
5
        auth_middleware::AuthMiddlewareLayer,
6
        client_server::ApiServer,
7
        internal_grpc::PipelineResponse,
8
        typed::{
9
            tests::{
10
                fake_internal_pipeline_server::start_fake_internal_grpc_pipeline,
11
                generated::films::{
12
                    films_client::FilmsClient, CountFilmsResponse, FilmEvent, QueryFilmsRequest,
13
                    QueryFilmsResponse,
14
                },
15
            },
16
            TypedService,
17
        },
18
    },
19
    CacheEndpoint, PipelineDetails,
20
};
21
use dozer_cache::cache::expression::{FilterExpression, QueryExpression};
22
use dozer_types::{
23
    models::{api_config::default_api_config, api_security::ApiSecurity},
24
    types::Schema,
25
};
26
use futures_util::FutureExt;
27
use std::{collections::HashMap, env, path::PathBuf, str::FromStr, time::Duration};
28

29
use super::{generated::films::FilmEventRequest, types::EventType};
30
use crate::test_utils;
31
use tokio::{
32
    sync::{
33
        broadcast::{self, Receiver},
34
        oneshot,
35
    },
36
    time::timeout,
37
};
38
use tokio_stream::StreamExt;
39
use tonic::{
40
    metadata::MetadataValue,
41
    transport::{Endpoint, Server},
42
    Code, Request,
43
};
44

45
pub fn setup_pipeline() -> (
11✔
46
    HashMap<String, PipelineDetails>,
11✔
47
    HashMap<String, Schema>,
11✔
48
    Receiver<PipelineResponse>,
11✔
49
) {
11✔
50
    let schema_name = String::from("films");
11✔
51
    let (schema, _) = test_utils::get_schema();
11✔
52
    let endpoint = test_utils::get_endpoint();
11✔
53
    let pipeline_details = PipelineDetails {
11✔
54
        schema_name: schema_name.clone(),
11✔
55
        cache_endpoint: CacheEndpoint {
11✔
56
            cache: test_utils::initialize_cache(&schema_name, None),
11✔
57
            endpoint,
11✔
58
        },
11✔
59
    };
11✔
60

11✔
61
    let (tx, rx1) = broadcast::channel::<PipelineResponse>(16);
11✔
62
    let default_api_internal = default_api_config().pipeline_internal.unwrap_or_default();
11✔
63
    tokio::spawn(async {
11✔
64
        ApiServer::setup_broad_cast_channel(tx, default_api_internal)
8✔
65
            .await
48✔
66
            .unwrap();
4✔
67
    });
11✔
68
    let mut pipeline_map = HashMap::new();
11✔
69
    pipeline_map.insert("films".to_string(), pipeline_details);
11✔
70

11✔
71
    let mut schema_map = HashMap::new();
11✔
72
    schema_map.insert("films".to_string(), schema);
11✔
73

11✔
74
    (pipeline_map, schema_map, rx1)
11✔
75
}
11✔
76

77
fn setup_typed_service(security: Option<ApiSecurity>) -> TypedService {
7✔
78
    let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
7✔
79

7✔
80
    let path = out_dir.join("generated_films.bin");
7✔
81

7✔
82
    let (_, desc) = get_proto_descriptor(&path).unwrap();
7✔
83

7✔
84
    let (pipeline_map, schema_map, rx1) = setup_pipeline();
7✔
85

7✔
86
    TypedService::new(desc, pipeline_map, schema_map, Some(rx1), security)
7✔
87
}
7✔
88

89
async fn test_grpc_count_and_query_common(
4✔
90
    port: u32,
4✔
91
    request: QueryFilmsRequest,
4✔
92
    api_security: Option<ApiSecurity>,
4✔
93
    access_token: Option<String>,
4✔
94
) -> Result<(CountFilmsResponse, QueryFilmsResponse), tonic::Status> {
4✔
95
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
4✔
96
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
4✔
97
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
4✔
98
        default_pipeline_internal.host,
4✔
99
        default_pipeline_internal.port,
4✔
100
        rx_internal,
4✔
101
    ));
4✔
102

4✔
103
    let typed_service = setup_typed_service(api_security.to_owned());
4✔
104
    let (_tx, rx) = oneshot::channel::<()>();
4✔
105
    // middleware
4✔
106
    let layer = tower::ServiceBuilder::new()
4✔
107
        .layer(AuthMiddlewareLayer::new(api_security.to_owned()))
4✔
108
        .into_inner();
4✔
109
    let _jh = tokio::spawn(async move {
4✔
110
        Server::builder()
4✔
111
            .layer(layer)
4✔
112
            .add_service(typed_service)
4✔
113
            .serve_with_shutdown(format!("127.0.0.1:{port:}").parse().unwrap(), rx.map(drop))
4✔
114
            .await
4✔
115
            .unwrap();
×
116
    });
4✔
117
    tokio::time::sleep(Duration::from_millis(100)).await;
4✔
118
    let channel = Endpoint::from_str(&format!("http://127.0.0.1:{port:}"))
4✔
119
        .unwrap()
4✔
120
        .connect()
4✔
121
        .await
8✔
122
        .unwrap();
4✔
123
    if api_security.is_some() {
4✔
124
        let my_token = access_token.unwrap_or_default();
2✔
125
        let mut client = FilmsClient::with_interceptor(channel, move |mut req: Request<()>| {
3✔
126
            let token: MetadataValue<_> = format!("Bearer {my_token:}").parse().unwrap();
3✔
127
            req.metadata_mut().insert("authorization", token);
3✔
128
            Ok(req)
3✔
129
        });
3✔
130
        let res = client.count(Request::new(request.clone())).await?;
4✔
131
        let count_response = res.into_inner();
1✔
132
        let res = client.query(Request::new(request)).await?;
2✔
133
        let query_response = res.into_inner();
1✔
134
        _ = sender_shutdown_internal.send(());
1✔
135
        Ok((count_response, query_response))
1✔
136
    } else {
137
        let mut client = FilmsClient::new(channel);
2✔
138
        let res = client.count(Request::new(request.clone())).await?;
4✔
139
        let count_response = res.into_inner();
2✔
140
        let res = client.query(Request::new(request)).await?;
4✔
141
        let query_response = res.into_inner();
2✔
142
        _ = sender_shutdown_internal.send(());
2✔
143
        Ok((count_response, query_response))
2✔
144
    }
145
}
4✔
146

147
#[tokio::test]
1✔
148
async fn test_grpc_query() {
1✔
149
    // create filter expression
1✔
150
    let filter = FilterExpression::Simple(
1✔
151
        "film_id".to_string(),
1✔
152
        dozer_cache::cache::expression::Operator::EQ,
1✔
153
        dozer_types::serde_json::Value::from(524),
1✔
154
    );
1✔
155

1✔
156
    let query = QueryExpression {
1✔
157
        filter: Some(filter),
1✔
158
        ..QueryExpression::default()
1✔
159
    };
1✔
160
    let request = QueryFilmsRequest {
1✔
161
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
162
    };
1✔
163

164
    let (count_response, query_response) =
1✔
165
        test_grpc_count_and_query_common(1402, request, None, None)
1✔
166
            .await
7✔
167
            .unwrap();
1✔
168
    assert_eq!(count_response.count, query_response.data.len() as u64);
1✔
169
    assert!(!query_response.data.len() > 0);
1✔
170
}
1✔
171

172
#[tokio::test]
1✔
173
async fn test_grpc_query_with_access_token() {
1✔
174
    // create filter expression
1✔
175
    let filter = FilterExpression::Simple(
1✔
176
        "film_id".to_string(),
1✔
177
        dozer_cache::cache::expression::Operator::EQ,
1✔
178
        dozer_types::serde_json::Value::from(524),
1✔
179
    );
1✔
180

1✔
181
    let query = QueryExpression {
1✔
182
        filter: Some(filter),
1✔
183
        ..QueryExpression::default()
1✔
184
    };
1✔
185
    let request = QueryFilmsRequest {
1✔
186
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
187
    };
1✔
188
    let api_security = ApiSecurity::Jwt("DXkzrlnTy6".to_owned());
1✔
189
    let authorizer = Authorizer::from(&api_security);
1✔
190
    let generated_token = authorizer.generate_token(Access::All, None).unwrap();
1✔
191
    let (count_response, query_response) =
1✔
192
        test_grpc_count_and_query_common(1403, request, Some(api_security), Some(generated_token))
1✔
193
            .await
7✔
194
            .unwrap();
1✔
195
    assert_eq!(count_response.count, query_response.data.len() as u64);
1✔
196
    assert!(!query_response.data.is_empty());
1✔
197
}
1✔
198

199
#[tokio::test]
1✔
200
async fn test_grpc_query_with_wrong_access_token() {
1✔
201
    // create filter expression
1✔
202
    let filter = FilterExpression::Simple(
1✔
203
        "film_id".to_string(),
1✔
204
        dozer_cache::cache::expression::Operator::EQ,
1✔
205
        dozer_types::serde_json::Value::from(524),
1✔
206
    );
1✔
207

1✔
208
    let query = QueryExpression {
1✔
209
        filter: Some(filter),
1✔
210
        ..QueryExpression::default()
1✔
211
    };
1✔
212
    let request = QueryFilmsRequest {
1✔
213
        query: Some(dozer_types::serde_json::to_string(&query).unwrap()),
1✔
214
    };
1✔
215
    let api_security = ApiSecurity::Jwt("DXkzrlnTy6".to_owned());
1✔
216
    let generated_token = "wrongrandomtoken".to_owned();
1✔
217
    let request_response =
1✔
218
        test_grpc_count_and_query_common(1404, request, Some(api_security), Some(generated_token))
1✔
219
            .await;
5✔
220
    assert!(request_response.is_err());
1✔
221
    assert!(request_response.unwrap_err().code() == Code::PermissionDenied);
1✔
222
}
1✔
223

224
#[tokio::test]
1✔
225
async fn test_grpc_query_empty_body() {
1✔
226
    let request = QueryFilmsRequest { query: None };
1✔
227

228
    let (count_response, query_response) =
1✔
229
        test_grpc_count_and_query_common(1405, request, None, None)
1✔
230
            .await
7✔
231
            .unwrap();
1✔
232
    assert_eq!(count_response.count, 52);
1✔
233
    assert_eq!(query_response.data.len(), 50);
1✔
234
}
1✔
235

236
#[tokio::test]
1✔
237
async fn test_typed_streaming1() {
1✔
238
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
239
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
1✔
240
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
241
        default_pipeline_internal.host,
1✔
242
        default_pipeline_internal.port,
1✔
243
        rx_internal,
1✔
244
    ));
1✔
245
    let (_tx, rx) = oneshot::channel::<()>();
1✔
246
    let _jh = tokio::spawn(async move {
1✔
247
        let typed_service = setup_typed_service(None);
1✔
248
        Server::builder()
1✔
249
            .add_service(typed_service)
1✔
250
            .serve_with_shutdown("127.0.0.1:14321".parse().unwrap(), rx.map(drop))
1✔
251
            .await
1✔
252
            .unwrap();
×
253
    });
1✔
254
    tokio::time::sleep(Duration::from_millis(1001)).await;
1✔
255
    let address = "http://127.0.0.1:14321".to_owned();
1✔
256
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
257

1✔
258
    let request = FilmEventRequest {
1✔
259
        r#type: EventType::All as i32,
1✔
260
        filter: None,
1✔
261
    };
1✔
262
    let stream = client
1✔
263
        .on_event(Request::new(request))
1✔
264
        .await
2✔
265
        .unwrap()
1✔
266
        .into_inner();
1✔
267
    let mut stream = stream.take(1);
1✔
268
    while let Some(item) = stream.next().await {
1✔
269
        let response: FilmEvent = item.unwrap();
×
270
        assert!(response.new.is_some());
×
271
    }
272
    _ = sender_shutdown_internal.send(());
1✔
273
}
1✔
274

275
#[tokio::test]
1✔
276
async fn test_typed_streaming2() {
1✔
277
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
278
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
1✔
279
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
280
        default_pipeline_internal.host,
1✔
281
        default_pipeline_internal.port,
1✔
282
        rx_internal,
1✔
283
    ));
1✔
284
    let (_tx, rx) = oneshot::channel::<()>();
1✔
285
    let _jh = tokio::spawn(async move {
1✔
286
        let typed_service = setup_typed_service(None);
1✔
287
        Server::builder()
1✔
288
            .add_service(typed_service)
1✔
289
            .serve_with_shutdown("127.0.0.1:14322".parse().unwrap(), rx.map(drop))
1✔
290
            .await
1✔
291
            .unwrap();
×
292
    });
1✔
293
    tokio::time::sleep(Duration::from_millis(1001)).await;
1✔
294
    let address = "http://127.0.0.1:14322".to_owned();
1✔
295
    let request = FilmEventRequest {
1✔
296
        r#type: EventType::All as i32,
1✔
297
        filter: Some(r#"{ "film_id": 32 }"#.into()),
1✔
298
    };
1✔
299
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
300
    let stream = client
1✔
301
        .on_event(Request::new(request))
1✔
302
        .await
2✔
303
        .unwrap()
1✔
304
        .into_inner();
1✔
305
    let mut stream = stream.take(1);
1✔
306
    while let Some(item) = stream.next().await {
2✔
307
        let response: FilmEvent = item.unwrap();
1✔
308
        assert!(response.new.is_some());
1✔
309
    }
310
    _ = sender_shutdown_internal.send(());
1✔
311
}
1✔
312

313
#[tokio::test]
1✔
314
async fn test_typed_streaming3() {
1✔
315
    let (sender_shutdown_internal, rx_internal) = oneshot::channel::<()>();
1✔
316
    let default_pipeline_internal = default_api_config().pipeline_internal.unwrap_or_default();
1✔
317
    let _jh1 = tokio::spawn(start_fake_internal_grpc_pipeline(
1✔
318
        default_pipeline_internal.host,
1✔
319
        default_pipeline_internal.port,
1✔
320
        rx_internal,
1✔
321
    ));
1✔
322
    let (_tx, rx) = oneshot::channel::<()>();
1✔
323
    let _jh = tokio::spawn(async move {
1✔
324
        let typed_service = setup_typed_service(None);
1✔
325
        Server::builder()
1✔
326
            .add_service(typed_service)
1✔
327
            .serve_with_shutdown("127.0.0.1:14323".parse().unwrap(), rx.map(drop))
1✔
328
            .await
1✔
329
            .unwrap();
×
330
    });
1✔
331
    tokio::time::sleep(Duration::from_millis(100)).await;
1✔
332
    let address = "http://127.0.0.1:14323".to_owned();
1✔
333
    let mut client = FilmsClient::connect(address.to_owned()).await.unwrap();
2✔
334
    let request = FilmEventRequest {
1✔
335
        r#type: EventType::All as i32,
1✔
336
        filter: Some(r#"{ "film_id": 0 }"#.into()),
1✔
337
    };
1✔
338
    let mut stream = client
1✔
339
        .on_event(Request::new(request))
1✔
340
        .await
2✔
341
        .unwrap()
1✔
342
        .into_inner();
1✔
343
    let error_timeout = timeout(Duration::from_secs(1), stream.next()).await;
1✔
344
    assert!(error_timeout.is_err() || error_timeout.unwrap().is_none());
1✔
345
    _ = sender_shutdown_internal.send(());
1✔
346
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc