• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075746792

pending completion
4075746792

Pull #790

github

GitHub
Merge 75b5f293b into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

393 of 393 new or added lines in 12 files covered. (100.0%)

24721 of 36724 relevant lines covered (67.32%)

55788.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.29
/dozer-core/src/dag/tests/dag_recordreader.rs
1
#![allow(non_snake_case)]
2
use crate::dag::channels::ProcessorChannelForwarder;
3
use crate::dag::dag::{Dag, Endpoint, NodeType};
4
use crate::dag::errors::ExecutionError;
5
use crate::dag::executor::{DagExecutor, ExecutorOptions};
6
use crate::dag::node::{
7
    NodeHandle, OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory,
8
};
9
use crate::dag::record_store::RecordReader;
10
use crate::dag::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
11
use crate::dag::tests::sources::{
12
    GeneratorSourceFactory, NoPkGeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT,
13
};
14
use crate::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
15
use dozer_types::types::{Field, Operation, Schema};
16

17
use std::collections::HashMap;
18

19
use std::sync::atomic::AtomicBool;
20
use std::sync::Arc;
21
use std::time::Duration;
22

23
use crate::dag::epoch::Epoch;
24
use crate::dag::tests::app::NoneContext;
25
use tempdir::TempDir;
26

27
macro_rules! chk {
28
    ($stmt:expr) => {
29
        $stmt.unwrap_or_else(|e| panic!("{}", e.to_string()))
30
    };
31
}
32

33
pub(crate) const PASSTHROUGH_PROCESSOR_INPUT_PORT: PortHandle = 50;
34
pub(crate) const PASSTHROUGH_PROCESSOR_OUTPUT_PORT: PortHandle = 60;
35

36
#[derive(Debug)]
×
37
pub(crate) struct PassthroughProcessorFactory {}
38

39
impl PassthroughProcessorFactory {
40
    pub fn new() -> Self {
1✔
41
        Self {}
1✔
42
    }
1✔
43
}
44

45
impl ProcessorFactory<NoneContext> for PassthroughProcessorFactory {
46
    fn get_output_schema(
1✔
47
        &self,
1✔
48
        _output_port: &PortHandle,
1✔
49
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
1✔
50
    ) -> Result<(Schema, NoneContext), ExecutionError> {
1✔
51
        Ok(input_schemas
1✔
52
            .get(&PASSTHROUGH_PROCESSOR_INPUT_PORT)
1✔
53
            .unwrap()
1✔
54
            .clone())
1✔
55
    }
1✔
56

57
    fn get_input_ports(&self) -> Vec<PortHandle> {
2✔
58
        vec![PASSTHROUGH_PROCESSOR_INPUT_PORT]
2✔
59
    }
2✔
60
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
61
        vec![OutputPortDef::new(
3✔
62
            PASSTHROUGH_PROCESSOR_OUTPUT_PORT,
3✔
63
            OutputPortType::StatefulWithPrimaryKeyLookup {
3✔
64
                retr_old_records_for_deletes: true,
3✔
65
                retr_old_records_for_updates: true,
3✔
66
            },
3✔
67
        )]
3✔
68
    }
3✔
69

70
    fn prepare(
×
71
        &self,
×
72
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
73
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
74
    ) -> Result<(), ExecutionError> {
×
75
        Ok(())
×
76
    }
×
77

78
    fn build(
1✔
79
        &self,
1✔
80
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
81
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
82
    ) -> Result<Box<dyn Processor>, ExecutionError> {
1✔
83
        Ok(Box::new(PassthroughProcessor {}))
1✔
84
    }
1✔
85
}
86

87
#[derive(Debug)]
×
88
pub(crate) struct PassthroughProcessor {}
89

90
impl Processor for PassthroughProcessor {
91
    fn init(&mut self, _tx: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
1✔
92
        Ok(())
1✔
93
    }
1✔
94

95
    fn commit(
11✔
96
        &self,
11✔
97
        _epoch_details: &Epoch,
11✔
98
        _tx: &SharedTransaction,
11✔
99
    ) -> Result<(), ExecutionError> {
11✔
100
        Ok(())
11✔
101
    }
11✔
102

103
    fn process(
10,000✔
104
        &mut self,
10,000✔
105
        _from_port: PortHandle,
10,000✔
106
        op: Operation,
10,000✔
107
        fw: &mut dyn ProcessorChannelForwarder,
10,000✔
108
        _tx: &SharedTransaction,
10,000✔
109
        _readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
10,000✔
110
    ) -> Result<(), ExecutionError> {
10,000✔
111
        fw.send(op, PASSTHROUGH_PROCESSOR_OUTPUT_PORT)
10,000✔
112
    }
10,000✔
113
}
114

115
#[derive(Debug)]
×
116
pub(crate) struct RecordReaderProcessorFactory {}
117

118
impl RecordReaderProcessorFactory {
119
    pub fn new() -> Self {
2✔
120
        Self {}
2✔
121
    }
2✔
122
}
123

124
pub(crate) const RECORD_READER_PROCESSOR_INPUT_PORT: PortHandle = 70;
125
pub(crate) const RECORD_READER_PROCESSOR_OUTPUT_PORT: PortHandle = 80;
126

127
impl ProcessorFactory<NoneContext> for RecordReaderProcessorFactory {
128
    fn get_output_schema(
2✔
129
        &self,
2✔
130
        _output_port: &PortHandle,
2✔
131
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
2✔
132
    ) -> Result<(Schema, NoneContext), ExecutionError> {
2✔
133
        Ok(input_schemas
2✔
134
            .get(&RECORD_READER_PROCESSOR_INPUT_PORT)
2✔
135
            .unwrap()
2✔
136
            .clone())
2✔
137
    }
2✔
138

139
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
140
        vec![RECORD_READER_PROCESSOR_INPUT_PORT]
4✔
141
    }
4✔
142
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
143
        vec![OutputPortDef::new(
6✔
144
            RECORD_READER_PROCESSOR_OUTPUT_PORT,
6✔
145
            OutputPortType::Stateless,
6✔
146
        )]
6✔
147
    }
6✔
148

149
    fn prepare(
×
150
        &self,
×
151
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
152
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
153
    ) -> Result<(), ExecutionError> {
×
154
        Ok(())
×
155
    }
×
156

157
    fn build(
2✔
158
        &self,
2✔
159
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
160
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
161
    ) -> Result<Box<dyn Processor>, ExecutionError> {
2✔
162
        Ok(Box::new(RecordReaderProcessor { ctr: 1 }))
2✔
163
    }
2✔
164
}
165

166
#[derive(Debug)]
×
167
pub(crate) struct RecordReaderProcessor {
168
    ctr: u64,
169
}
170

171
impl Processor for RecordReaderProcessor {
172
    fn init(&mut self, _tx: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
2✔
173
        Ok(())
2✔
174
    }
2✔
175

176
    fn commit(
12✔
177
        &self,
12✔
178
        _epoch_details: &Epoch,
12✔
179
        _tx: &SharedTransaction,
12✔
180
    ) -> Result<(), ExecutionError> {
12✔
181
        Ok(())
12✔
182
    }
12✔
183

184
    fn process(
11,000✔
185
        &mut self,
11,000✔
186
        _from_port: PortHandle,
11,000✔
187
        op: Operation,
11,000✔
188
        fw: &mut dyn ProcessorChannelForwarder,
11,000✔
189
        _tx: &SharedTransaction,
11,000✔
190
        readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
11,000✔
191
    ) -> Result<(), ExecutionError> {
11,000✔
192
        let v = readers
11,000✔
193
            .get(&RECORD_READER_PROCESSOR_INPUT_PORT)
11,000✔
194
            .unwrap()
11,000✔
195
            .get(
11,000✔
196
                Field::String(format!("key_{}", self.ctr))
11,000✔
197
                    .encode()
11,000✔
198
                    .as_slice(),
11,000✔
199
                1,
11,000✔
200
            )?;
11,000✔
201
        assert!(v.is_some());
11,000✔
202
        self.ctr += 1;
11,000✔
203

11,000✔
204
        fw.send(op, RECORD_READER_PROCESSOR_OUTPUT_PORT)
11,000✔
205
    }
11,000✔
206
}
207

208
#[test]
1✔
209
fn test_run_dag_record_reader() {
1✔
210
    const TOT: u64 = 10_000;
1✔
211

1✔
212
    let sync = Arc::new(AtomicBool::new(true));
1✔
213

1✔
214
    let src = GeneratorSourceFactory::new(TOT, sync.clone(), false);
1✔
215
    let passthrough = PassthroughProcessorFactory::new();
1✔
216
    let record_reader = RecordReaderProcessorFactory::new();
1✔
217
    let sink = CountingSinkFactory::new(TOT, sync);
1✔
218

1✔
219
    let mut dag = Dag::new();
1✔
220

1✔
221
    let SOURCE_ID: NodeHandle = NodeHandle::new(None, 1.to_string());
1✔
222
    let PASSTHROUGH_ID: NodeHandle = NodeHandle::new(Some(1), 1.to_string());
1✔
223
    let RECORD_READER_ID: NodeHandle = NodeHandle::new(Some(1), 2.to_string());
1✔
224
    let SINK_ID: NodeHandle = NodeHandle::new(Some(1), 3.to_string());
1✔
225

1✔
226
    dag.add_node(NodeType::Source(Arc::new(src)), SOURCE_ID.clone());
1✔
227
    dag.add_node(
1✔
228
        NodeType::Processor(Arc::new(passthrough)),
1✔
229
        PASSTHROUGH_ID.clone(),
1✔
230
    );
1✔
231
    dag.add_node(
1✔
232
        NodeType::Processor(Arc::new(record_reader)),
1✔
233
        RECORD_READER_ID.clone(),
1✔
234
    );
1✔
235
    dag.add_node(NodeType::Sink(Arc::new(sink)), SINK_ID.clone());
1✔
236

1✔
237
    assert!(dag
1✔
238
        .connect(
1✔
239
            Endpoint::new(SOURCE_ID, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
240
            Endpoint::new(PASSTHROUGH_ID.clone(), PASSTHROUGH_PROCESSOR_INPUT_PORT),
1✔
241
        )
1✔
242
        .is_ok());
1✔
243

244
    assert!(dag
1✔
245
        .connect(
1✔
246
            Endpoint::new(PASSTHROUGH_ID, PASSTHROUGH_PROCESSOR_OUTPUT_PORT),
1✔
247
            Endpoint::new(RECORD_READER_ID.clone(), RECORD_READER_PROCESSOR_INPUT_PORT),
1✔
248
        )
1✔
249
        .is_ok());
1✔
250

251
    assert!(dag
1✔
252
        .connect(
1✔
253
            Endpoint::new(RECORD_READER_ID, RECORD_READER_PROCESSOR_OUTPUT_PORT),
1✔
254
            Endpoint::new(SINK_ID, COUNTING_SINK_INPUT_PORT),
1✔
255
        )
1✔
256
        .is_ok());
1✔
257

258
    let options = ExecutorOptions {
1✔
259
        commit_sz: 1000,
1✔
260
        commit_time_threshold: Duration::from_millis(5),
1✔
261
        ..Default::default()
1✔
262
    };
1✔
263

1✔
264
    let tmp_dir = chk!(TempDir::new("test"));
1✔
265
    let mut executor = chk!(DagExecutor::new(
1✔
266
        &dag,
×
267
        tmp_dir.path(),
×
268
        options,
×
269
        Arc::new(AtomicBool::new(true))
×
270
    ));
×
271

272
    chk!(executor.start());
×
273
    assert!(executor.join().is_ok());
1✔
274
}
1✔
275

276
#[test]
1✔
277
fn test_run_dag_record_reader_from_src() {
1✔
278
    const TOT: u64 = 1_000;
1✔
279

1✔
280
    let sync = Arc::new(AtomicBool::new(true));
1✔
281

1✔
282
    let src = GeneratorSourceFactory::new(TOT, sync.clone(), true);
1✔
283
    let record_reader = RecordReaderProcessorFactory::new();
1✔
284
    let sink = CountingSinkFactory::new(TOT, sync);
1✔
285

1✔
286
    let mut dag = Dag::new();
1✔
287

1✔
288
    let SOURCE_ID: NodeHandle = NodeHandle::new(None, 1.to_string());
1✔
289
    let RECORD_READER_ID: NodeHandle = NodeHandle::new(Some(1), 1.to_string());
1✔
290
    let SINK_ID: NodeHandle = NodeHandle::new(Some(1), 2.to_string());
1✔
291

1✔
292
    dag.add_node(NodeType::Source(Arc::new(src)), SOURCE_ID.clone());
1✔
293
    dag.add_node(
1✔
294
        NodeType::Processor(Arc::new(record_reader)),
1✔
295
        RECORD_READER_ID.clone(),
1✔
296
    );
1✔
297
    dag.add_node(NodeType::Sink(Arc::new(sink)), SINK_ID.clone());
1✔
298

1✔
299
    assert!(dag
1✔
300
        .connect(
1✔
301
            Endpoint::new(SOURCE_ID, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
302
            Endpoint::new(RECORD_READER_ID.clone(), RECORD_READER_PROCESSOR_INPUT_PORT),
1✔
303
        )
1✔
304
        .is_ok());
1✔
305

306
    assert!(dag
1✔
307
        .connect(
1✔
308
            Endpoint::new(RECORD_READER_ID, RECORD_READER_PROCESSOR_OUTPUT_PORT),
1✔
309
            Endpoint::new(SINK_ID, COUNTING_SINK_INPUT_PORT),
1✔
310
        )
1✔
311
        .is_ok());
1✔
312

313
    let tmp_dir = chk!(TempDir::new("test"));
1✔
314
    let mut executor = chk!(DagExecutor::new(
1✔
315
        &dag,
×
316
        tmp_dir.path(),
×
317
        ExecutorOptions::default(),
×
318
        Arc::new(AtomicBool::new(true))
×
319
    ));
×
320

321
    chk!(executor.start());
×
322
    assert!(executor.join().is_ok());
1✔
323
}
1✔
324

325
#[derive(Debug)]
×
326
pub(crate) struct NoPkRecordReaderProcessorFactory {}
327

328
impl NoPkRecordReaderProcessorFactory {
329
    pub fn new() -> Self {
1✔
330
        Self {}
1✔
331
    }
1✔
332
}
333

334
impl ProcessorFactory<NoneContext> for NoPkRecordReaderProcessorFactory {
335
    fn get_output_schema(
1✔
336
        &self,
1✔
337
        _output_port: &PortHandle,
1✔
338
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
1✔
339
    ) -> Result<(Schema, NoneContext), ExecutionError> {
1✔
340
        Ok(input_schemas
1✔
341
            .get(&RECORD_READER_PROCESSOR_INPUT_PORT)
1✔
342
            .unwrap()
1✔
343
            .clone())
1✔
344
    }
1✔
345

×
346
    fn get_input_ports(&self) -> Vec<PortHandle> {
2✔
347
        vec![RECORD_READER_PROCESSOR_INPUT_PORT]
2✔
348
    }
2✔
349
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
350
        vec![OutputPortDef::new(
3✔
351
            RECORD_READER_PROCESSOR_OUTPUT_PORT,
3✔
352
            OutputPortType::Stateless,
3✔
353
        )]
3✔
354
    }
3✔
355

×
356
    fn prepare(
×
357
        &self,
×
358
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
359
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
360
    ) -> Result<(), ExecutionError> {
×
361
        Ok(())
×
362
    }
×
363

×
364
    fn build(
1✔
365
        &self,
1✔
366
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
367
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
368
    ) -> Result<Box<dyn Processor>, ExecutionError> {
1✔
369
        Ok(Box::new(NoPkRecordReaderProcessor { ctr: 1 }))
1✔
370
    }
1✔
371
}
×
372

×
373
#[derive(Debug)]
×
374
pub(crate) struct NoPkRecordReaderProcessor {
375
    ctr: u64,
376
}
×
377

378
impl Processor for NoPkRecordReaderProcessor {
379
    fn init(&mut self, _tx: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
1✔
380
        Ok(())
1✔
381
    }
1✔
382

×
383
    fn commit(
1✔
384
        &self,
1✔
385
        _epoch_details: &Epoch,
1✔
386
        _tx: &SharedTransaction,
1✔
387
    ) -> Result<(), ExecutionError> {
1✔
388
        Ok(())
1✔
389
    }
1✔
390

×
391
    fn process(
1,000✔
392
        &mut self,
1,000✔
393
        _from_port: PortHandle,
1,000✔
394
        op: Operation,
1,000✔
395
        fw: &mut dyn ProcessorChannelForwarder,
1,000✔
396
        _tx: &SharedTransaction,
1,000✔
397
        readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
1,000✔
398
    ) -> Result<(), ExecutionError> {
1,000✔
399
        let v = readers
1,000✔
400
            .get(&RECORD_READER_PROCESSOR_INPUT_PORT)
1,000✔
401
            .unwrap()
1,000✔
402
            .get(Field::UInt(self.ctr).encode().as_slice(), 1)?;
1,000✔
403
        assert!(v.is_some());
1,000✔
404
        self.ctr += 1;
1,000✔
405

1,000✔
406
        fw.send(op, RECORD_READER_PROCESSOR_OUTPUT_PORT)
1,000✔
407
    }
1,000✔
408
}
×
409

×
410
#[test]
1✔
411
fn test_run_dag_record_reader_from_rowkey_autogen_src() {
1✔
412
    const TOT: u64 = 1_000;
1✔
413

1✔
414
    let sync = Arc::new(AtomicBool::new(true));
1✔
415

1✔
416
    let src = NoPkGeneratorSourceFactory::new(TOT, sync.clone(), true);
1✔
417
    let record_reader = NoPkRecordReaderProcessorFactory::new();
1✔
418
    let sink = CountingSinkFactory::new(TOT, sync);
1✔
419

1✔
420
    let mut dag = Dag::new();
1✔
421

1✔
422
    let SOURCE_ID: NodeHandle = NodeHandle::new(None, 1.to_string());
1✔
423
    let RECORD_READER_ID: NodeHandle = NodeHandle::new(Some(1), 1.to_string());
1✔
424
    let SINK_ID: NodeHandle = NodeHandle::new(Some(1), 2.to_string());
1✔
425

1✔
426
    dag.add_node(NodeType::Source(Arc::new(src)), SOURCE_ID.clone());
1✔
427
    dag.add_node(
1✔
428
        NodeType::Processor(Arc::new(record_reader)),
1✔
429
        RECORD_READER_ID.clone(),
1✔
430
    );
1✔
431
    dag.add_node(NodeType::Sink(Arc::new(sink)), SINK_ID.clone());
1✔
432

1✔
433
    assert!(dag
1✔
434
        .connect(
1✔
435
            Endpoint::new(SOURCE_ID, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
436
            Endpoint::new(RECORD_READER_ID.clone(), RECORD_READER_PROCESSOR_INPUT_PORT),
1✔
437
        )
1✔
438
        .is_ok());
1✔
439

×
440
    assert!(dag
1✔
441
        .connect(
1✔
442
            Endpoint::new(RECORD_READER_ID, RECORD_READER_PROCESSOR_OUTPUT_PORT),
1✔
443
            Endpoint::new(SINK_ID, COUNTING_SINK_INPUT_PORT),
1✔
444
        )
1✔
445
        .is_ok());
1✔
446

×
447
    let tmp_dir = chk!(TempDir::new("test"));
1✔
448
    let mut executor = chk!(DagExecutor::new(
1✔
449
        &dag,
×
450
        tmp_dir.path(),
×
451
        ExecutorOptions::default(),
×
452
        Arc::new(AtomicBool::new(true))
×
453
    ));
×
454

×
455
    chk!(executor.start());
×
456
    assert!(executor.join().is_ok());
1✔
457
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc