• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/product/tests/pipeline_test.rs
1
use dozer_core::dag::app::App;
2
use dozer_core::dag::appsource::{AppSource, AppSourceManager};
3
use dozer_core::dag::channels::SourceChannelForwarder;
4
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
5
use dozer_core::dag::epoch::Epoch;
6
use dozer_core::dag::errors::ExecutionError;
7
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
8
use dozer_core::dag::node::{
9
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
10
};
11
use dozer_core::dag::record_store::RecordReader;
12
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
13
use dozer_types::ordered_float::OrderedFloat;
14
use dozer_types::tracing::{debug, info};
15
use dozer_types::types::{
16
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
17
};
18

19
use std::collections::HashMap;
20
use std::sync::atomic::{AtomicBool, Ordering};
21
use std::sync::Arc;
22
use tempdir::TempDir;
23

24
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
25

26
const USER_PORT: u16 = 0 as PortHandle;
27
const DEPARTMENT_PORT: u16 = 1 as PortHandle;
28
const COUNTRY_PORT: u16 = 2 as PortHandle;
29

30
#[derive(Debug)]
×
31
pub struct TestSourceFactory {
32
    running: Arc<AtomicBool>,
33
}
34

35
impl TestSourceFactory {
36
    pub fn new(running: Arc<AtomicBool>) -> Self {
×
37
        Self { running }
×
38
    }
×
39
}
40

41
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
42
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
×
43
        Ok(vec![
×
44
            OutputPortDef::new(
×
45
                USER_PORT,
×
46
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
47
                    retr_old_records_for_updates: true,
×
48
                    retr_old_records_for_deletes: true,
×
49
                },
×
50
            ),
×
51
            OutputPortDef::new(
×
52
                DEPARTMENT_PORT,
×
53
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
54
                    retr_old_records_for_updates: true,
×
55
                    retr_old_records_for_deletes: true,
×
56
                },
×
57
            ),
×
58
            OutputPortDef::new(
×
59
                COUNTRY_PORT,
×
60
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
61
                    retr_old_records_for_updates: true,
×
62
                    retr_old_records_for_deletes: true,
×
63
                },
×
64
            ),
×
65
        ])
×
66
    }
×
67

68
    fn get_output_schema(
×
69
        &self,
×
70
        port: &PortHandle,
×
71
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
×
72
        if port == &USER_PORT {
×
73
            let source_id = SourceDefinition::Table {
×
74
                connection: "connection".to_string(),
×
75
                name: "user".to_string(),
×
76
            };
×
77
            Ok((
×
78
                Schema::empty()
×
79
                    .field(
×
80
                        FieldDefinition::new(
×
81
                            String::from("id"),
×
82
                            FieldType::Int,
×
83
                            false,
×
84
                            source_id.clone(),
×
85
                        ),
×
86
                        true,
×
87
                    )
×
88
                    .field(
×
89
                        FieldDefinition::new(
×
90
                            String::from("name"),
×
91
                            FieldType::String,
×
92
                            false,
×
93
                            source_id.clone(),
×
94
                        ),
×
95
                        false,
×
96
                    )
×
97
                    .field(
×
98
                        FieldDefinition::new(
×
99
                            String::from("department_id"),
×
100
                            FieldType::Int,
×
101
                            false,
×
102
                            source_id.clone(),
×
103
                        ),
×
104
                        false,
×
105
                    )
×
106
                    .field(
×
107
                        FieldDefinition::new(
×
108
                            String::from("country_id"),
×
109
                            FieldType::String,
×
110
                            false,
×
111
                            source_id.clone(),
×
112
                        ),
×
113
                        false,
×
114
                    )
×
115
                    .field(
×
116
                        FieldDefinition::new(
×
117
                            String::from("salary"),
×
118
                            FieldType::Float,
×
119
                            false,
×
120
                            source_id,
×
121
                        ),
×
122
                        false,
×
123
                    )
×
124
                    .clone(),
×
125
                SchemaSQLContext::default(),
×
126
            ))
×
127
        } else if port == &DEPARTMENT_PORT {
×
128
            let source_id = SourceDefinition::Table {
×
129
                connection: "connection".to_string(),
×
130
                name: "department".to_string(),
×
131
            };
×
132
            Ok((
×
133
                Schema::empty()
×
134
                    .field(
×
135
                        FieldDefinition::new(
×
136
                            String::from("did"),
×
137
                            FieldType::Int,
×
138
                            false,
×
139
                            source_id.clone(),
×
140
                        ),
×
141
                        true,
×
142
                    )
×
143
                    .field(
×
144
                        FieldDefinition::new(
×
145
                            String::from("dname"),
×
146
                            FieldType::String,
×
147
                            false,
×
148
                            source_id,
×
149
                        ),
×
150
                        false,
×
151
                    )
×
152
                    .clone(),
×
153
                SchemaSQLContext::default(),
×
154
            ))
×
155
        } else if port == &COUNTRY_PORT {
×
156
            let source_id = SourceDefinition::Table {
×
157
                connection: "connection".to_string(),
×
158
                name: "country".to_string(),
×
159
            };
×
160
            Ok((
×
161
                Schema::empty()
×
162
                    .field(
×
163
                        FieldDefinition::new(
×
164
                            String::from("cid"),
×
165
                            FieldType::String,
×
166
                            false,
×
167
                            source_id.clone(),
×
168
                        ),
×
169
                        true,
×
170
                    )
×
171
                    .field(
×
172
                        FieldDefinition::new(
×
173
                            String::from("cname"),
×
174
                            FieldType::String,
×
175
                            false,
×
176
                            source_id,
×
177
                        ),
×
178
                        false,
×
179
                    )
×
180
                    .clone(),
×
181
                SchemaSQLContext::default(),
×
182
            ))
×
183
        } else {
184
            panic!("Invalid Port Handle {port}");
×
185
        }
186
    }
×
187

188
    fn build(
×
189
        &self,
×
190
        _output_schemas: HashMap<PortHandle, Schema>,
×
191
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
192
        Ok(Box::new(TestSource {
×
193
            running: self.running.clone(),
×
194
        }))
×
195
    }
×
196

197
    fn prepare(
×
198
        &self,
×
199
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
200
    ) -> Result<(), ExecutionError> {
×
201
        Ok(())
×
202
    }
×
203
}
204

205
#[derive(Debug)]
×
206
pub struct TestSource {
207
    running: Arc<AtomicBool>,
208
}
209

210
impl Source for TestSource {
211
    fn start(
×
212
        &self,
×
213
        fw: &mut dyn SourceChannelForwarder,
×
214
        _from_seq: Option<(u64, u64)>,
×
215
    ) -> Result<(), ExecutionError> {
×
216
        let operations = vec![
×
217
            (
×
218
                Operation::Insert {
×
219
                    new: Record::new(
×
220
                        None,
×
221
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
222
                        Some(1),
×
223
                    ),
×
224
                },
×
225
                DEPARTMENT_PORT,
×
226
            ),
×
227
            // (
×
228
            //     Operation::Insert {
×
229
            //         new: Record::new(
×
230
            //             None,
×
231
            //             vec![Field::Int(1), Field::String("HR".to_string())],
×
232
            //             Some(1),
×
233
            //         ),
×
234
            //     },
×
235
            //     DEPARTMENT_PORT,
×
236
            // ),
×
237
            (
×
238
                Operation::Insert {
×
239
                    new: Record::new(
×
240
                        None,
×
241
                        vec![
×
242
                            Field::Int(10000),
×
243
                            Field::String("Alice".to_string()),
×
244
                            Field::Int(0),
×
245
                            Field::String("UK".to_string()),
×
246
                            Field::Float(OrderedFloat(1.1)),
×
247
                        ],
×
248
                        Some(1),
×
249
                    ),
×
250
                },
×
251
                USER_PORT,
×
252
            ),
×
253
            (
×
254
                Operation::Insert {
×
255
                    new: Record::new(
×
256
                        None,
×
257
                        vec![
×
258
                            Field::Int(10001),
×
259
                            Field::String("Bob".to_string()),
×
260
                            Field::Int(0),
×
261
                            Field::String("UK".to_string()),
×
262
                            Field::Float(OrderedFloat(1.1)),
×
263
                        ],
×
264
                        Some(1),
×
265
                    ),
×
266
                },
×
267
                USER_PORT,
×
268
            ),
×
269
            // (
×
270
            //     Operation::Insert {
×
271
            //         new: Record::new(
×
272
            //             None,
×
273
            //             vec![
×
274
            //                 Field::String("UK".to_string()),
×
275
            //                 Field::String("United Kingdom".to_string()),
×
276
            //             ],
×
277
            //             Some(1),
×
278
            //         ),
×
279
            //     },
×
280
            //     COUNTRY_PORT,
×
281
            // ),
×
282
            // (
×
283
            //     Operation::Insert {
×
284
            //         new: Record::new(
×
285
            //             None,
×
286
            //             vec![
×
287
            //                 Field::String("SG".to_string()),
×
288
            //                 Field::String("Singapore".to_string()),
×
289
            //             ],
×
290
            //             Some(1),
×
291
            //         ),
×
292
            //     },
×
293
            //     COUNTRY_PORT,
×
294
            // ),
×
295
            (
×
296
                Operation::Insert {
×
297
                    new: Record::new(
×
298
                        None,
×
299
                        vec![
×
300
                            Field::Int(10002),
×
301
                            Field::String("Craig".to_string()),
×
302
                            Field::Int(1),
×
303
                            Field::String("SG".to_string()),
×
304
                            Field::Float(OrderedFloat(1.1)),
×
305
                        ],
×
306
                        Some(1),
×
307
                    ),
×
308
                },
×
309
                USER_PORT,
×
310
            ),
×
311
            // (
×
312
            //     Operation::Delete {
×
313
            //         old: Record::new(
×
314
            //             None,
×
315
            //             vec![Field::Int(1), Field::String("HR".to_string())],
×
316
            //             Some(1),
×
317
            //         ),
×
318
            //     },
×
319
            //     DEPARTMENT_PORT,
×
320
            // ),
×
321
            (
×
322
                Operation::Insert {
×
323
                    new: Record::new(
×
324
                        None,
×
325
                        vec![
×
326
                            Field::Int(10003),
×
327
                            Field::String("Dan".to_string()),
×
328
                            Field::Int(0),
×
329
                            Field::String("UK".to_string()),
×
330
                            Field::Float(OrderedFloat(1.1)),
×
331
                        ],
×
332
                        Some(1),
×
333
                    ),
×
334
                },
×
335
                USER_PORT,
×
336
            ),
×
337
            (
×
338
                Operation::Insert {
×
339
                    new: Record::new(
×
340
                        None,
×
341
                        vec![
×
342
                            Field::Int(10004),
×
343
                            Field::String("Eve".to_string()),
×
344
                            Field::Int(1),
×
345
                            Field::String("SG".to_string()),
×
346
                            Field::Float(OrderedFloat(1.1)),
×
347
                        ],
×
348
                        Some(1),
×
349
                    ),
×
350
                },
×
351
                USER_PORT,
×
352
            ),
×
353
            // (
×
354
            //     Operation::Delete {
×
355
            //         old: Record::new(
×
356
            //             None,
×
357
            //             vec![
×
358
            //                 Field::Int(10002),
×
359
            //                 Field::String("Craig".to_string()),
×
360
            //                 Field::Int(1),
×
361
            //                 Field::Float(OrderedFloat(1.1)),
×
362
            //             ],
×
363
            //             None,
×
364
            //         ),
×
365
            //     },
×
366
            //     USER_PORT,
×
367
            // ),
×
368
            (
×
369
                Operation::Insert {
×
370
                    new: Record::new(
×
371
                        None,
×
372
                        vec![
×
373
                            Field::Int(10005),
×
374
                            Field::String("Frank".to_string()),
×
375
                            Field::Int(1),
×
376
                            Field::String("SG".to_string()),
×
377
                            Field::Float(OrderedFloat(1.5)),
×
378
                        ],
×
379
                        None,
×
380
                    ),
×
381
                },
×
382
                USER_PORT,
×
383
            ),
×
384
            (
×
385
                Operation::Update {
×
386
                    old: Record::new(
×
387
                        None,
×
388
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
389
                        Some(1),
×
390
                    ),
×
391
                    new: Record::new(
×
392
                        None,
×
393
                        vec![Field::Int(0), Field::String("RD".to_string())],
×
394
                        Some(2),
×
395
                    ),
×
396
                },
×
397
                DEPARTMENT_PORT,
×
398
            ),
×
399
            // (
×
400
            //     Operation::Update {
×
401
            //         old: Record::new(
×
402
            //             None,
×
403
            //             vec![Field::Int(0), Field::String("IT".to_string())],
×
404
            //             None,
×
405
            //         ),
×
406
            //         new: Record::new(
×
407
            //             None,
×
408
            //             vec![Field::Int(0), Field::String("XX".to_string())],
×
409
            //             None,
×
410
            //         ),
×
411
            //     },
×
412
            //     DEPARTMENT_PORT,
×
413
            // ),
×
414
        ];
×
415

416
        for operation in operations.iter().enumerate() {
×
417
            // match operation.1.clone().0 {
×
418
            //     Operation::Delete { old } => {
×
419
            //         info!("s{}: - {:?}", operation.1.clone().1, old.values)
×
420
            //     }
×
421
            //     Operation::Insert { new } => {
×
422
            //         info!("s{}: + {:?}", operation.1.clone().1, new.values)
×
423
            //     }
×
424
            //     Operation::Update { old, new } => {
×
425
            //         info!(
×
426
            //             "s{}: - {:?}, + {:?}",
×
427
            //             operation.1.clone().1,
×
428
            //             old.values,
×
429
            //             new.values
×
430
            //         )
×
431
            //     }
×
432
            // }
×
433
            fw.send(
×
434
                operation.0.try_into().unwrap(),
×
435
                0,
×
436
                operation.1.clone().0,
×
437
                operation.1.clone().1,
×
438
            )
×
439
            .unwrap();
×
440
        }
×
441

442
        loop {
×
443
            if !self.running.load(Ordering::Relaxed) {
×
444
                break;
×
445
            }
×
446
            // thread::sleep(Duration::from_millis(500));
447
        }
448
        Ok(())
×
449
    }
×
450
}
451

452
#[derive(Debug)]
×
453
pub(crate) struct TestSinkFactory {
454
    expected: u64,
455
    running: Arc<AtomicBool>,
456
}
457

458
impl TestSinkFactory {
459
    pub fn new(expected: u64, barrier: Arc<AtomicBool>) -> Self {
×
460
        Self {
×
461
            expected,
×
462
            running: barrier,
×
463
        }
×
464
    }
×
465
}
466

467
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
468
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
469
        vec![DEFAULT_PORT_HANDLE]
×
470
    }
×
471

472
    fn build(
×
473
        &self,
×
474
        _input_schemas: HashMap<PortHandle, Schema>,
×
475
    ) -> Result<Box<dyn Sink>, ExecutionError> {
×
476
        Ok(Box::new(TestSink {
×
477
            expected: self.expected,
×
478
            current: 0,
×
479
            running: self.running.clone(),
×
480
        }))
×
481
    }
×
482

×
483
    fn prepare(
×
484
        &self,
×
485
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
486
    ) -> Result<(), ExecutionError> {
×
487
        Ok(())
×
488
    }
×
489
}
490

×
491
#[derive(Debug)]
×
492
pub struct TestSink {
×
493
    expected: u64,
×
494
    current: u64,
×
495
    running: Arc<AtomicBool>,
×
496
}
497

498
impl Sink for TestSink {
×
499
    fn init(&mut self, _env: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
×
500
        debug!("SINK: Initialising TestSink");
×
501
        Ok(())
×
502
    }
×
503

504
    fn process(
×
505
        &mut self,
×
506
        _from_port: PortHandle,
×
507
        _op: Operation,
×
508
        _state: &SharedTransaction,
×
509
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
×
510
    ) -> Result<(), ExecutionError> {
×
511
        match _op {
×
512
            Operation::Delete { old } => info!("o0:-> - {:?}", old.values),
×
513
            Operation::Insert { new } => info!("o0:-> + {:?}", new.values),
×
514
            Operation::Update { old, new } => {
×
515
                info!("o0:-> - {:?}, + {:?}", old.values, new.values)
×
516
            }
×
517
        }
×
518

×
519
        self.current += 1;
×
520
        if self.current == self.expected {
×
521
            debug!(
×
522
                "Received {} messages. Notifying sender to exit!",
×
523
                self.current
×
524
            );
×
525
            self.running.store(false, Ordering::Relaxed);
×
526
        }
×
527
        Ok(())
×
528
    }
×
529

×
530
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
×
531
        Ok(())
×
532
    }
×
533
}
×
534

×
535
#[test]
×
536
#[ignore]
537
fn test_pipeline_builder() {
×
538
    dozer_tracing::init_telemetry(false).unwrap();
×
539

×
540
    let (mut pipeline, (node, port)) = statement_to_pipeline(
×
541
        "SELECT  name, dname, cname, salary \
×
542
        FROM user LEFT JOIN department ON user.department_id = department.did LEFT JOIN country ON user.country_id = country.cid ",
×
543
    )
×
544
    .unwrap();
×
545

×
546
    let latch = Arc::new(AtomicBool::new(true));
×
547

×
548
    let mut asm = AppSourceManager::new();
×
549
    asm.add(AppSource::new(
×
550
        "conn".to_string(),
×
551
        Arc::new(TestSourceFactory::new(latch.clone())),
×
552
        vec![
×
553
            ("user".to_string(), USER_PORT),
×
554
            ("department".to_string(), DEPARTMENT_PORT),
×
555
            ("country".to_string(), COUNTRY_PORT),
×
556
        ]
×
557
        .into_iter()
×
558
        .collect(),
×
559
    ))
×
560
    .unwrap();
×
561

×
562
    pipeline.add_sink(Arc::new(TestSinkFactory::new(8, latch)), "sink");
×
563
    pipeline
×
564
        .connect_nodes(&node, Some(port), "sink", Some(DEFAULT_PORT_HANDLE))
×
565
        .unwrap();
×
566

×
567
    let mut app = App::new(asm);
×
568
    app.add_pipeline(pipeline);
×
569

×
570
    let dag = app.get_dag().unwrap();
×
571

×
572
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
573
    if tmp_dir.path().exists() {
×
574
        std::fs::remove_dir_all(tmp_dir.path())
×
575
            .unwrap_or_else(|_e| panic!("Unable to remove old dir"));
×
576
    }
×
577
    std::fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
578

×
579
    use std::time::Instant;
×
580
    let now = Instant::now();
×
581

×
582
    let tmp_dir = TempDir::new("test").unwrap();
×
583

×
584
    let mut executor = DagExecutor::new(
×
585
        &dag,
×
586
        tmp_dir.path(),
×
587
        ExecutorOptions::default(),
×
588
        Arc::new(AtomicBool::new(true)),
×
589
    )
×
590
    .unwrap();
×
591

×
592
    executor
×
593
        .start()
×
594
        .unwrap_or_else(|e| panic!("Unable to start the Executor: {e}"));
×
595
    assert!(executor.join().is_ok());
×
596

×
597
    let elapsed = now.elapsed();
×
598
    debug!("Elapsed: {:.2?}", elapsed);
×
599
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc