• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.57
/dozer-core/src/dag/tests/dag_schemas.rs
1
use crate::dag::dag::{Dag, Endpoint, NodeType, DEFAULT_PORT_HANDLE};
2
use crate::dag::dag_schemas::DagSchemas;
3
use crate::dag::errors::ExecutionError;
4
use crate::dag::executor::{DagExecutor, ExecutorOptions};
5
use crate::dag::node::{
6
    NodeHandle, OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory,
7
    SinkFactory, Source, SourceFactory,
8
};
9

10
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
11
use std::collections::HashMap;
12

13
use crate::dag::tests::app::NoneContext;
14
use std::sync::atomic::AtomicBool;
15
use std::sync::Arc;
16
use tempdir::TempDir;
17

18
macro_rules! chk {
19
    ($stmt:expr) => {
20
        $stmt.unwrap_or_else(|e| panic!("{}", e.to_string()))
21
    };
22
}
23

24
#[derive(Debug)]
×
25
struct TestUsersSourceFactory {}
26

27
impl SourceFactory<NoneContext> for TestUsersSourceFactory {
28
    fn get_output_schema(
5✔
29
        &self,
5✔
30
        _port: &PortHandle,
5✔
31
    ) -> Result<(Schema, NoneContext), ExecutionError> {
5✔
32
        Ok((
5✔
33
            Schema::empty()
5✔
34
                .field(
5✔
35
                    FieldDefinition::new(
5✔
36
                        "user_id".to_string(),
5✔
37
                        FieldType::String,
5✔
38
                        false,
5✔
39
                        SourceDefinition::Dynamic,
5✔
40
                    ),
5✔
41
                    true,
5✔
42
                )
5✔
43
                .field(
5✔
44
                    FieldDefinition::new(
5✔
45
                        "username".to_string(),
5✔
46
                        FieldType::String,
5✔
47
                        false,
5✔
48
                        SourceDefinition::Dynamic,
5✔
49
                    ),
5✔
50
                    true,
5✔
51
                )
5✔
52
                .field(
5✔
53
                    FieldDefinition::new(
5✔
54
                        "country_id".to_string(),
5✔
55
                        FieldType::String,
5✔
56
                        false,
5✔
57
                        SourceDefinition::Dynamic,
5✔
58
                    ),
5✔
59
                    true,
5✔
60
                )
5✔
61
                .clone(),
5✔
62
            NoneContext {},
5✔
63
        ))
5✔
64
    }
5✔
65

66
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
9✔
67
        Ok(vec![OutputPortDef::new(
9✔
68
            DEFAULT_PORT_HANDLE,
9✔
69
            OutputPortType::Stateless,
9✔
70
        )])
9✔
71
    }
9✔
72

73
    fn prepare(
×
74
        &self,
×
75
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
76
    ) -> Result<(), ExecutionError> {
×
77
        Ok(())
×
78
    }
×
79

80
    fn build(
×
81
        &self,
×
82
        _input_schemas: HashMap<PortHandle, Schema>,
×
83
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
84
        todo!()
×
85
    }
×
86
}
87

88
#[derive(Debug)]
×
89
struct TestCountriesSourceFactory {}
90

91
impl SourceFactory<NoneContext> for TestCountriesSourceFactory {
92
    fn get_output_schema(
3✔
93
        &self,
3✔
94
        _port: &PortHandle,
3✔
95
    ) -> Result<(Schema, NoneContext), ExecutionError> {
3✔
96
        Ok((
3✔
97
            Schema::empty()
3✔
98
                .field(
3✔
99
                    FieldDefinition::new(
3✔
100
                        "country_id".to_string(),
3✔
101
                        FieldType::String,
3✔
102
                        false,
3✔
103
                        SourceDefinition::Dynamic,
3✔
104
                    ),
3✔
105
                    true,
3✔
106
                )
3✔
107
                .field(
3✔
108
                    FieldDefinition::new(
3✔
109
                        "country_name".to_string(),
3✔
110
                        FieldType::String,
3✔
111
                        false,
3✔
112
                        SourceDefinition::Dynamic,
3✔
113
                    ),
3✔
114
                    true,
3✔
115
                )
3✔
116
                .clone(),
3✔
117
            NoneContext {},
3✔
118
        ))
3✔
119
    }
3✔
120

121
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
5✔
122
        Ok(vec![OutputPortDef::new(
5✔
123
            DEFAULT_PORT_HANDLE,
5✔
124
            OutputPortType::Stateless,
5✔
125
        )])
5✔
126
    }
5✔
127

128
    fn prepare(
×
129
        &self,
×
130
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
131
    ) -> Result<(), ExecutionError> {
×
132
        Ok(())
×
133
    }
×
134

135
    fn build(
×
136
        &self,
×
137
        _input_schemas: HashMap<PortHandle, Schema>,
×
138
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
139
        todo!()
×
140
    }
×
141
}
142

143
#[derive(Debug)]
×
144
struct TestJoinProcessorFactory {}
145

146
impl ProcessorFactory<NoneContext> for TestJoinProcessorFactory {
147
    fn get_output_schema(
4✔
148
        &self,
4✔
149
        _output_port: &PortHandle,
4✔
150
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
4✔
151
    ) -> Result<(Schema, NoneContext), ExecutionError> {
4✔
152
        let mut joined: Vec<FieldDefinition> = Vec::new();
4✔
153
        joined.extend(input_schemas.get(&1).unwrap().0.fields.clone());
4✔
154
        joined.extend(input_schemas.get(&2).unwrap().0.fields.clone());
4✔
155
        Ok((
4✔
156
            Schema {
4✔
157
                fields: joined,
4✔
158
                primary_index: vec![],
4✔
159
                identifier: None,
4✔
160
            },
4✔
161
            NoneContext {},
4✔
162
        ))
4✔
163
    }
4✔
164

165
    fn get_input_ports(&self) -> Vec<PortHandle> {
10✔
166
        vec![1, 2]
10✔
167
    }
10✔
168

169
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
7✔
170
        vec![OutputPortDef::new(
7✔
171
            DEFAULT_PORT_HANDLE,
7✔
172
            OutputPortType::Stateless,
7✔
173
        )]
7✔
174
    }
7✔
175

176
    fn prepare(
×
177
        &self,
×
178
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
179
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
180
    ) -> Result<(), ExecutionError> {
×
181
        Ok(())
×
182
    }
×
183

184
    fn build(
×
185
        &self,
×
186
        _input_schemas: HashMap<PortHandle, Schema>,
×
187
        _output_schemas: HashMap<PortHandle, Schema>,
×
188
    ) -> Result<Box<dyn Processor>, ExecutionError> {
×
189
        todo!()
×
190
    }
×
191
}
192

193
#[derive(Debug)]
×
194
struct TestSinkFactory {}
195

196
impl SinkFactory<NoneContext> for TestSinkFactory {
197
    fn get_input_ports(&self) -> Vec<PortHandle> {
7✔
198
        vec![DEFAULT_PORT_HANDLE]
7✔
199
    }
7✔
200

×
201
    fn prepare(
×
202
        &self,
×
203
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
204
    ) -> Result<(), ExecutionError> {
×
205
        Ok(())
×
206
    }
×
207

208
    fn build(
×
209
        &self,
×
210
        _input_schemas: HashMap<PortHandle, Schema>,
×
211
    ) -> Result<Box<dyn crate::dag::node::Sink>, ExecutionError> {
×
212
        todo!()
×
213
    }
×
214
}
215

×
216
#[test]
1✔
217
fn test_extract_dag_schemas() {
1✔
218
    let mut dag = Dag::new();
1✔
219

1✔
220
    let users_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
221
    let countries_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
222
    let join_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
223
    let sink_handle = NodeHandle::new(Some(1), 4.to_string());
1✔
224

1✔
225
    dag.add_node(
1✔
226
        NodeType::Source(Arc::new(TestUsersSourceFactory {})),
1✔
227
        users_handle.clone(),
1✔
228
    );
1✔
229
    dag.add_node(
1✔
230
        NodeType::Source(Arc::new(TestCountriesSourceFactory {})),
1✔
231
        countries_handle.clone(),
1✔
232
    );
1✔
233
    dag.add_node(
1✔
234
        NodeType::Processor(Arc::new(TestJoinProcessorFactory {})),
1✔
235
        join_handle.clone(),
1✔
236
    );
1✔
237
    dag.add_node(
1✔
238
        NodeType::Sink(Arc::new(TestSinkFactory {})),
1✔
239
        sink_handle.clone(),
1✔
240
    );
1✔
241

1✔
242
    chk!(dag.connect(
1✔
243
        Endpoint::new(users_handle.clone(), DEFAULT_PORT_HANDLE),
×
244
        Endpoint::new(join_handle.clone(), 1),
×
245
    ));
×
246
    chk!(dag.connect(
×
247
        Endpoint::new(countries_handle.clone(), DEFAULT_PORT_HANDLE),
×
248
        Endpoint::new(join_handle.clone(), 2),
×
249
    ));
×
250
    chk!(dag.connect(
×
251
        Endpoint::new(join_handle.clone(), DEFAULT_PORT_HANDLE),
×
252
        Endpoint::new(sink_handle.clone(), DEFAULT_PORT_HANDLE),
×
253
    ));
×
254

×
255
    let dag_schemas = chk!(DagSchemas::new(&dag));
1✔
256

×
257
    let users_output = chk!(dag_schemas.get_node_output_schemas(&users_handle));
1✔
258
    assert_eq!(
1✔
259
        users_output
1✔
260
            .get(&DEFAULT_PORT_HANDLE)
1✔
261
            .unwrap()
1✔
262
            .0
1✔
263
            .fields
1✔
264
            .len(),
1✔
265
        3
1✔
266
    );
1✔
267

×
268
    let countries_output = chk!(dag_schemas.get_node_output_schemas(&countries_handle));
1✔
269
    assert_eq!(
1✔
270
        countries_output
1✔
271
            .get(&DEFAULT_PORT_HANDLE)
1✔
272
            .unwrap()
1✔
273
            .0
1✔
274
            .fields
1✔
275
            .len(),
1✔
276
        2
1✔
277
    );
1✔
278

×
279
    let join_input = chk!(dag_schemas.get_node_input_schemas(&join_handle));
1✔
280
    assert_eq!(join_input.get(&1).unwrap().0.fields.len(), 3);
1✔
281
    assert_eq!(join_input.get(&2).unwrap().0.fields.len(), 2);
1✔
282

×
283
    let join_output = chk!(dag_schemas.get_node_output_schemas(&join_handle));
1✔
284
    assert_eq!(
1✔
285
        join_output
1✔
286
            .get(&DEFAULT_PORT_HANDLE)
1✔
287
            .unwrap()
1✔
288
            .0
1✔
289
            .fields
1✔
290
            .len(),
1✔
291
        5
1✔
292
    );
1✔
293

×
294
    let sink_input = chk!(dag_schemas.get_node_input_schemas(&sink_handle));
1✔
295
    assert_eq!(
1✔
296
        sink_input.get(&DEFAULT_PORT_HANDLE).unwrap().0.fields.len(),
1✔
297
        5
1✔
298
    );
1✔
299
}
1✔
300

301
#[test]
1✔
302
fn test_init_metadata() {
1✔
303
    let users_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
304
    let countries_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
305
    let join_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
306
    let sink_handle = NodeHandle::new(Some(1), 4.to_string());
1✔
307

1✔
308
    let mut dag = Dag::new();
1✔
309
    dag.add_node(
1✔
310
        NodeType::Source(Arc::new(TestUsersSourceFactory {})),
1✔
311
        users_handle.clone(),
1✔
312
    );
1✔
313
    dag.add_node(
1✔
314
        NodeType::Source(Arc::new(TestCountriesSourceFactory {})),
1✔
315
        countries_handle.clone(),
1✔
316
    );
1✔
317
    dag.add_node(
1✔
318
        NodeType::Processor(Arc::new(TestJoinProcessorFactory {})),
1✔
319
        join_handle.clone(),
1✔
320
    );
1✔
321
    dag.add_node(
1✔
322
        NodeType::Sink(Arc::new(TestSinkFactory {})),
1✔
323
        sink_handle.clone(),
1✔
324
    );
1✔
325

×
326
    chk!(dag.connect(
×
327
        Endpoint::new(users_handle.clone(), DEFAULT_PORT_HANDLE),
×
328
        Endpoint::new(join_handle.clone(), 1),
×
329
    ));
×
330
    chk!(dag.connect(
1✔
331
        Endpoint::new(countries_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
332
        Endpoint::new(join_handle.clone(), 2),
1✔
333
    ));
1✔
334
    chk!(dag.connect(
×
335
        Endpoint::new(join_handle.clone(), DEFAULT_PORT_HANDLE),
×
336
        Endpoint::new(sink_handle.clone(), DEFAULT_PORT_HANDLE),
×
337
    ));
×
338

×
339
    let tmp_dir = chk!(TempDir::new("example"));
1✔
340
    let _exec = chk!(DagExecutor::new(
1✔
341
        &dag,
×
342
        tmp_dir.path(),
×
343
        ExecutorOptions::default(),
×
344
        Arc::new(AtomicBool::new(true))
×
345
    ));
×
346
    let _exec = chk!(DagExecutor::new(
1✔
347
        &dag,
×
348
        tmp_dir.path(),
×
349
        ExecutorOptions::default(),
×
350
        Arc::new(AtomicBool::new(true))
×
351
    ));
×
352

×
353
    let mut dag = Dag::new();
1✔
354
    dag.add_node(
1✔
355
        NodeType::Source(Arc::new(TestUsersSourceFactory {})),
1✔
356
        users_handle.clone(),
1✔
357
    );
1✔
358
    dag.add_node(
1✔
359
        NodeType::Source(Arc::new(TestUsersSourceFactory {})),
1✔
360
        countries_handle.clone(),
1✔
361
    );
1✔
362
    dag.add_node(
1✔
363
        NodeType::Processor(Arc::new(TestJoinProcessorFactory {})),
1✔
364
        join_handle.clone(),
1✔
365
    );
1✔
366
    dag.add_node(
1✔
367
        NodeType::Sink(Arc::new(TestSinkFactory {})),
1✔
368
        sink_handle.clone(),
1✔
369
    );
1✔
370

×
371
    chk!(dag.connect(
×
372
        Endpoint::new(users_handle, DEFAULT_PORT_HANDLE),
×
373
        Endpoint::new(join_handle.clone(), 1),
×
374
    ));
×
375
    chk!(dag.connect(
×
376
        Endpoint::new(countries_handle, DEFAULT_PORT_HANDLE),
×
377
        Endpoint::new(join_handle.clone(), 2),
×
378
    ));
×
379
    chk!(dag.connect(
×
380
        Endpoint::new(join_handle, DEFAULT_PORT_HANDLE),
×
381
        Endpoint::new(sink_handle, DEFAULT_PORT_HANDLE),
×
382
    ));
×
383

×
384
    let exec = DagExecutor::new(
1✔
385
        &dag,
1✔
386
        tmp_dir.path(),
1✔
387
        ExecutorOptions::default(),
1✔
388
        Arc::new(AtomicBool::new(true)),
1✔
389
    );
1✔
390
    assert!(exec.is_err());
1✔
391
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc