• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.23
/dozer-core/src/dag/tests/sources.rs
1
use crate::dag::channels::SourceChannelForwarder;
2
use crate::dag::errors::ExecutionError;
3
use crate::dag::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
4
use dozer_types::types::{
5
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
6
};
7

8
use std::collections::HashMap;
9
use std::sync::atomic::{AtomicBool, Ordering};
10
use std::sync::Arc;
11
use std::thread;
12

13
use crate::dag::tests::app::NoneContext;
14
use std::time::Duration;
15

16
pub(crate) const GENERATOR_SOURCE_OUTPUT_PORT: PortHandle = 100;
17

18
#[derive(Debug)]
×
19
pub(crate) struct GeneratorSourceFactory {
20
    count: u64,
21
    running: Arc<AtomicBool>,
22
    stateful: bool,
23
}
24

25
impl GeneratorSourceFactory {
26
    pub fn new(count: u64, barrier: Arc<AtomicBool>, stateful: bool) -> Self {
27✔
27
        Self {
27✔
28
            count,
27✔
29
            running: barrier,
27✔
30
            stateful,
27✔
31
        }
27✔
32
    }
27✔
33
}
34

35
impl SourceFactory<NoneContext> for GeneratorSourceFactory {
36
    fn get_output_schema(
27✔
37
        &self,
27✔
38
        _port: &PortHandle,
27✔
39
    ) -> Result<(Schema, NoneContext), ExecutionError> {
27✔
40
        Ok((
27✔
41
            Schema::empty()
27✔
42
                .field(
27✔
43
                    FieldDefinition::new(
27✔
44
                        "id".to_string(),
27✔
45
                        FieldType::String,
27✔
46
                        false,
27✔
47
                        SourceDefinition::Dynamic,
27✔
48
                    ),
27✔
49
                    true,
27✔
50
                )
27✔
51
                .field(
27✔
52
                    FieldDefinition::new(
27✔
53
                        "value".to_string(),
27✔
54
                        FieldType::String,
27✔
55
                        false,
27✔
56
                        SourceDefinition::Dynamic,
27✔
57
                    ),
27✔
58
                    false,
27✔
59
                )
27✔
60
                .clone(),
27✔
61
            NoneContext {},
27✔
62
        ))
27✔
63
    }
27✔
64

65
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
84✔
66
        Ok(vec![OutputPortDef::new(
84✔
67
            GENERATOR_SOURCE_OUTPUT_PORT,
84✔
68
            if self.stateful {
84✔
69
                OutputPortType::StatefulWithPrimaryKeyLookup {
48✔
70
                    retr_old_records_for_updates: true,
48✔
71
                    retr_old_records_for_deletes: true,
48✔
72
                }
48✔
73
            } else {
74
                OutputPortType::Stateless
36✔
75
            },
76
        )])
77
    }
84✔
78

79
    fn prepare(
×
80
        &self,
×
81
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
82
    ) -> Result<(), ExecutionError> {
×
83
        Ok(())
×
84
    }
×
85

86
    fn build(
27✔
87
        &self,
27✔
88
        _input_schemas: HashMap<PortHandle, Schema>,
27✔
89
    ) -> Result<Box<dyn Source>, ExecutionError> {
27✔
90
        Ok(Box::new(GeneratorSource {
27✔
91
            count: self.count,
27✔
92
            running: self.running.clone(),
27✔
93
        }))
27✔
94
    }
27✔
95
}
96

97
#[derive(Debug)]
×
98
pub(crate) struct GeneratorSource {
99
    count: u64,
100
    running: Arc<AtomicBool>,
101
}
102

103
impl Source for GeneratorSource {
104
    fn start(
27✔
105
        &self,
27✔
106
        fw: &mut dyn SourceChannelForwarder,
27✔
107
        from_seq: Option<(u64, u64)>,
27✔
108
    ) -> Result<(), ExecutionError> {
27✔
109
        let start = from_seq.unwrap_or((0, 0)).0;
27✔
110

111
        for n in start + 1..(start + self.count + 1) {
3,657,236✔
112
            fw.send(
3,657,236✔
113
                n,
3,657,236✔
114
                0,
3,657,236✔
115
                Operation::Insert {
3,657,236✔
116
                    new: Record::new(
3,657,236✔
117
                        None,
3,657,236✔
118
                        vec![
3,657,236✔
119
                            Field::String(format!("key_{n}")),
3,657,236✔
120
                            Field::String(format!("value_{n}")),
3,657,236✔
121
                        ],
3,657,236✔
122
                        None,
3,657,236✔
123
                    ),
3,657,236✔
124
                },
3,657,236✔
125
                GENERATOR_SOURCE_OUTPUT_PORT,
3,657,236✔
126
            )?;
3,657,236✔
127
        }
128

129
        loop {
130
            if !self.running.load(Ordering::Relaxed) {
69✔
131
                break;
19✔
132
            }
50✔
133
            thread::sleep(Duration::from_millis(500));
50✔
134
        }
135

136
        Ok(())
19✔
137
    }
27✔
138
}
139

140
pub(crate) const DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1: PortHandle = 1000;
141
pub(crate) const DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2: PortHandle = 2000;
142

143
#[derive(Debug)]
×
144
pub(crate) struct DualPortGeneratorSourceFactory {
145
    count: u64,
146
    running: Arc<AtomicBool>,
147
    stateful: bool,
148
}
149

150
impl DualPortGeneratorSourceFactory {
151
    pub fn new(count: u64, barrier: Arc<AtomicBool>, stateful: bool) -> Self {
2✔
152
        Self {
2✔
153
            count,
2✔
154
            running: barrier,
2✔
155
            stateful,
2✔
156
        }
2✔
157
    }
2✔
158
}
159

160
impl SourceFactory<NoneContext> for DualPortGeneratorSourceFactory {
161
    fn get_output_schema(
4✔
162
        &self,
4✔
163
        _port: &PortHandle,
4✔
164
    ) -> Result<(Schema, NoneContext), ExecutionError> {
4✔
165
        Ok((
4✔
166
            Schema::empty()
4✔
167
                .field(
4✔
168
                    FieldDefinition::new(
4✔
169
                        "id".to_string(),
4✔
170
                        FieldType::String,
4✔
171
                        false,
4✔
172
                        SourceDefinition::Dynamic,
4✔
173
                    ),
4✔
174
                    true,
4✔
175
                )
4✔
176
                .field(
4✔
177
                    FieldDefinition::new(
4✔
178
                        "value".to_string(),
4✔
179
                        FieldType::String,
4✔
180
                        false,
4✔
181
                        SourceDefinition::Dynamic,
4✔
182
                    ),
4✔
183
                    false,
4✔
184
                )
4✔
185
                .clone(),
4✔
186
            NoneContext {},
4✔
187
        ))
4✔
188
    }
4✔
189

190
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
9✔
191
        Ok(vec![
9✔
192
            OutputPortDef::new(
9✔
193
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1,
9✔
194
                if self.stateful {
9✔
195
                    OutputPortType::StatefulWithPrimaryKeyLookup {
5✔
196
                        retr_old_records_for_updates: true,
5✔
197
                        retr_old_records_for_deletes: true,
5✔
198
                    }
5✔
199
                } else {
200
                    OutputPortType::Stateless
4✔
201
                },
202
            ),
203
            OutputPortDef::new(
204
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2,
205
                if self.stateful {
9✔
206
                    OutputPortType::StatefulWithPrimaryKeyLookup {
5✔
207
                        retr_old_records_for_updates: true,
5✔
208
                        retr_old_records_for_deletes: true,
5✔
209
                    }
5✔
210
                } else {
211
                    OutputPortType::Stateless
4✔
212
                },
213
            ),
214
        ])
215
    }
9✔
216

217
    fn prepare(
×
218
        &self,
×
219
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
220
    ) -> Result<(), ExecutionError> {
×
221
        Ok(())
×
222
    }
×
223

224
    fn build(
2✔
225
        &self,
2✔
226
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
227
    ) -> Result<Box<dyn Source>, ExecutionError> {
2✔
228
        Ok(Box::new(DualPortGeneratorSource {
2✔
229
            count: self.count,
2✔
230
            running: self.running.clone(),
2✔
231
        }))
2✔
232
    }
2✔
233
}
234

235
#[derive(Debug)]
×
236
pub(crate) struct DualPortGeneratorSource {
237
    count: u64,
238
    running: Arc<AtomicBool>,
239
}
240

241
impl Source for DualPortGeneratorSource {
242
    fn start(
2✔
243
        &self,
2✔
244
        fw: &mut dyn SourceChannelForwarder,
2✔
245
        _from_seq: Option<(u64, u64)>,
2✔
246
    ) -> Result<(), ExecutionError> {
2✔
247
        for n in 1..(self.count + 1) {
60,000✔
248
            fw.send(
60,000✔
249
                n,
60,000✔
250
                0,
60,000✔
251
                Operation::Insert {
60,000✔
252
                    new: Record::new(
60,000✔
253
                        None,
60,000✔
254
                        vec![
60,000✔
255
                            Field::String(format!("key_{n}")),
60,000✔
256
                            Field::String(format!("value_{n}")),
60,000✔
257
                        ],
60,000✔
258
                        None,
60,000✔
259
                    ),
60,000✔
260
                },
60,000✔
261
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1,
60,000✔
262
            )?;
60,000✔
263
            fw.send(
60,000✔
264
                n,
60,000✔
265
                0,
60,000✔
266
                Operation::Insert {
60,000✔
267
                    new: Record::new(
60,000✔
268
                        None,
60,000✔
269
                        vec![
60,000✔
270
                            Field::String(format!("key_{n}")),
60,000✔
271
                            Field::String(format!("value_{n}")),
60,000✔
272
                        ],
60,000✔
273
                        None,
60,000✔
274
                    ),
60,000✔
275
                },
60,000✔
276
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2,
60,000✔
277
            )?;
60,000✔
278
        }
279
        loop {
280
            if !self.running.load(Ordering::Relaxed) {
6✔
281
                break;
2✔
282
            }
4✔
283
            thread::sleep(Duration::from_millis(500));
4✔
284
        }
285
        Ok(())
2✔
286
    }
2✔
287
}
288

289
#[derive(Debug)]
×
290
pub(crate) struct NoPkGeneratorSourceFactory {
291
    count: u64,
×
292
    running: Arc<AtomicBool>,
293
    stateful: bool,
294
}
295

296
impl NoPkGeneratorSourceFactory {
297
    pub fn new(count: u64, barrier: Arc<AtomicBool>, stateful: bool) -> Self {
1✔
298
        Self {
1✔
299
            count,
1✔
300
            running: barrier,
1✔
301
            stateful,
1✔
302
        }
1✔
303
    }
1✔
304
}
×
305

×
306
impl SourceFactory<NoneContext> for NoPkGeneratorSourceFactory {
307
    fn get_output_schema(
1✔
308
        &self,
1✔
309
        _port: &PortHandle,
1✔
310
    ) -> Result<(Schema, NoneContext), ExecutionError> {
1✔
311
        Ok((
1✔
312
            Schema::empty()
1✔
313
                .field(
1✔
314
                    FieldDefinition::new(
1✔
315
                        "id".to_string(),
1✔
316
                        FieldType::String,
1✔
317
                        false,
1✔
318
                        SourceDefinition::Dynamic,
1✔
319
                    ),
1✔
320
                    true,
1✔
321
                )
1✔
322
                .field(
1✔
323
                    FieldDefinition::new(
1✔
324
                        "value".to_string(),
1✔
325
                        FieldType::String,
1✔
326
                        false,
1✔
327
                        SourceDefinition::Dynamic,
1✔
328
                    ),
1✔
329
                    false,
1✔
330
                )
1✔
331
                .clone(),
1✔
332
            NoneContext {},
1✔
333
        ))
1✔
334
    }
1✔
335

×
336
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
3✔
337
        Ok(vec![OutputPortDef::new(
3✔
338
            GENERATOR_SOURCE_OUTPUT_PORT,
3✔
339
            if self.stateful {
3✔
340
                OutputPortType::AutogenRowKeyLookup
3✔
341
            } else {
×
342
                OutputPortType::Stateless
×
343
            },
344
        )])
×
345
    }
3✔
346

347
    fn prepare(
×
348
        &self,
×
349
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
350
    ) -> Result<(), ExecutionError> {
×
351
        Ok(())
×
352
    }
×
353

×
354
    fn build(
1✔
355
        &self,
1✔
356
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
357
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
358
        Ok(Box::new(NoPkGeneratorSource {
1✔
359
            count: self.count,
1✔
360
            running: self.running.clone(),
1✔
361
        }))
1✔
362
    }
1✔
363
}
×
364

×
365
#[derive(Debug)]
×
366
pub(crate) struct NoPkGeneratorSource {
367
    count: u64,
×
368
    running: Arc<AtomicBool>,
369
}
370

371
impl Source for NoPkGeneratorSource {
372
    fn start(
1✔
373
        &self,
1✔
374
        fw: &mut dyn SourceChannelForwarder,
1✔
375
        from_seq: Option<(u64, u64)>,
1✔
376
    ) -> Result<(), ExecutionError> {
1✔
377
        let start = from_seq.unwrap_or((0, 0)).0;
1✔
378

×
379
        for n in start + 1..(start + self.count + 1) {
1,000✔
380
            fw.send(
1,000✔
381
                n,
1,000✔
382
                0,
1,000✔
383
                Operation::Insert {
1,000✔
384
                    new: Record::new(
1,000✔
385
                        None,
1,000✔
386
                        vec![
1,000✔
387
                            Field::String(format!("key_{n}")),
1,000✔
388
                            Field::String(format!("value_{n}")),
1,000✔
389
                        ],
1,000✔
390
                        None,
1,000✔
391
                    ),
1,000✔
392
                },
1,000✔
393
                GENERATOR_SOURCE_OUTPUT_PORT,
1,000✔
394
            )?;
1,000✔
395
        }
×
396

×
397
        loop {
398
            if !self.running.load(Ordering::Relaxed) {
2✔
399
                break;
1✔
400
            }
1✔
401
            thread::sleep(Duration::from_millis(500));
1✔
402
        }
×
403

×
404
        Ok(())
1✔
405
    }
1✔
406
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc