• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075746792

pending completion
4075746792

Pull #790

github

GitHub
Merge 75b5f293b into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

393 of 393 new or added lines in 12 files covered. (100.0%)

24721 of 36724 relevant lines covered (67.32%)

55788.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.82
/dozer-core/src/dag/tests/dag_base_errors.rs
1
use crate::chk;
2
use crate::dag::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
3
use crate::dag::dag::{Dag, Endpoint, NodeType, DEFAULT_PORT_HANDLE};
4
use crate::dag::errors::ExecutionError;
5
use crate::dag::executor::{DagExecutor, ExecutorOptions};
6
use crate::dag::node::{
7
    NodeHandle, OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink,
8
    SinkFactory, Source, SourceFactory,
9
};
10
use crate::dag::record_store::RecordReader;
11
use crate::dag::tests::dag_base_run::NoopProcessorFactory;
12
use crate::dag::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
13
use crate::dag::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
14
use crate::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
15
use dozer_types::types::{
16
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
17
};
18

19
use std::collections::HashMap;
20
use std::panic;
21

22
use std::sync::atomic::AtomicBool;
23
use std::sync::Arc;
24

25
use crate::dag::epoch::Epoch;
26

27
use crate::dag::tests::app::NoneContext;
28
use tempdir::TempDir;
29

30
// Test when error is generated by a processor
31

32
#[derive(Debug)]
×
33
struct ErrorProcessorFactory {
34
    err_on: u64,
35
    panic: bool,
36
}
37

38
impl ProcessorFactory<NoneContext> for ErrorProcessorFactory {
39
    fn get_output_schema(
3✔
40
        &self,
3✔
41
        _output_port: &PortHandle,
3✔
42
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
3✔
43
    ) -> Result<(Schema, NoneContext), ExecutionError> {
3✔
44
        Ok(input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap().clone())
3✔
45
    }
3✔
46

47
    fn get_input_ports(&self) -> Vec<PortHandle> {
6✔
48
        vec![DEFAULT_PORT_HANDLE]
6✔
49
    }
6✔
50

51
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
9✔
52
        vec![OutputPortDef::new(
9✔
53
            DEFAULT_PORT_HANDLE,
9✔
54
            OutputPortType::Stateless,
9✔
55
        )]
9✔
56
    }
9✔
57

58
    fn prepare(
×
59
        &self,
×
60
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
61
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
62
    ) -> Result<(), ExecutionError> {
×
63
        Ok(())
×
64
    }
×
65

66
    fn build(
3✔
67
        &self,
3✔
68
        _input_schemas: HashMap<PortHandle, Schema>,
3✔
69
        _output_schemas: HashMap<PortHandle, Schema>,
3✔
70
    ) -> Result<Box<dyn Processor>, ExecutionError> {
3✔
71
        Ok(Box::new(ErrorProcessor {
3✔
72
            err_on: self.err_on,
3✔
73
            count: 0,
3✔
74
            panic: self.panic,
3✔
75
        }))
3✔
76
    }
3✔
77
}
78

79
#[derive(Debug)]
×
80
struct ErrorProcessor {
81
    err_on: u64,
82
    count: u64,
83
    panic: bool,
84
}
85

86
impl Processor for ErrorProcessor {
87
    fn init(&mut self, _state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
3✔
88
        Ok(())
3✔
89
    }
3✔
90

91
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
507✔
92
        Ok(())
507✔
93
    }
507✔
94

95
    fn process(
2,399,952✔
96
        &mut self,
2,399,952✔
97
        _from_port: PortHandle,
2,399,952✔
98
        op: Operation,
2,399,952✔
99
        fw: &mut dyn ProcessorChannelForwarder,
2,399,952✔
100
        _tx: &SharedTransaction,
2,399,952✔
101
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
2,399,952✔
102
    ) -> Result<(), ExecutionError> {
2,399,952✔
103
        self.count += 1;
2,399,952✔
104
        if self.count == self.err_on {
2,399,952✔
105
            if self.panic {
3✔
106
                panic!("Generated error");
1✔
107
            } else {
108
                return Err(ExecutionError::InvalidOperation("Uknown".to_string()));
2✔
109
            }
110
        }
2,399,949✔
111

2,399,949✔
112
        fw.send(op, DEFAULT_PORT_HANDLE)
2,399,949✔
113
    }
2,399,951✔
114
}
115

116
#[test]
1✔
117
#[should_panic]
118
fn test_run_dag_proc_err_panic() {
1✔
119
    let count: u64 = 1_000_000;
1✔
120

1✔
121
    let mut dag = Dag::new();
1✔
122
    let latch = Arc::new(AtomicBool::new(true));
1✔
123

1✔
124
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
125
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
126
    let sink_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
127

1✔
128
    dag.add_node(
1✔
129
        NodeType::Source(Arc::new(GeneratorSourceFactory::new(
1✔
130
            count,
1✔
131
            latch.clone(),
1✔
132
            false,
1✔
133
        ))),
1✔
134
        source_handle.clone(),
1✔
135
    );
1✔
136
    dag.add_node(
1✔
137
        NodeType::Processor(Arc::new(ErrorProcessorFactory {
1✔
138
            err_on: 800_000,
1✔
139
            panic: true,
1✔
140
        })),
1✔
141
        proc_handle.clone(),
1✔
142
    );
1✔
143
    dag.add_node(
1✔
144
        NodeType::Sink(Arc::new(CountingSinkFactory::new(count, latch))),
1✔
145
        sink_handle.clone(),
1✔
146
    );
1✔
147

148
    chk!(dag.connect(
×
149
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
150
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
151
    ));
×
152

153
    chk!(dag.connect(
×
154
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
155
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
156
    ));
×
157

158
    let tmp_dir = chk!(TempDir::new("test"));
1✔
159
    let mut executor = chk!(DagExecutor::new(
1✔
160
        &dag,
×
161
        tmp_dir.path(),
×
162
        ExecutorOptions::default(),
×
163
        Arc::new(AtomicBool::new(true))
×
164
    ));
×
165

166
    chk!(executor.start());
×
167
    assert!(executor.join().is_err());
1✔
168
}
1✔
169

170
#[test]
1✔
171
#[should_panic]
172
fn test_run_dag_proc_err_2() {
1✔
173
    let count: u64 = 1_000_000;
1✔
174

1✔
175
    let mut dag = Dag::new();
1✔
176
    let latch = Arc::new(AtomicBool::new(true));
1✔
177

1✔
178
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
179
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
180
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
181
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
182

1✔
183
    dag.add_node(
1✔
184
        NodeType::Source(Arc::new(GeneratorSourceFactory::new(
1✔
185
            count,
1✔
186
            latch.clone(),
1✔
187
            false,
1✔
188
        ))),
1✔
189
        source_handle.clone(),
1✔
190
    );
1✔
191
    dag.add_node(
1✔
192
        NodeType::Processor(Arc::new(NoopProcessorFactory {})),
1✔
193
        proc_handle.clone(),
1✔
194
    );
1✔
195

1✔
196
    dag.add_node(
1✔
197
        NodeType::Processor(Arc::new(ErrorProcessorFactory {
1✔
198
            err_on: 800_000,
1✔
199
            panic: false,
1✔
200
        })),
1✔
201
        proc_err_handle.clone(),
1✔
202
    );
1✔
203

1✔
204
    dag.add_node(
1✔
205
        NodeType::Sink(Arc::new(CountingSinkFactory::new(count, latch))),
1✔
206
        sink_handle.clone(),
1✔
207
    );
1✔
208

209
    chk!(dag.connect(
×
210
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
211
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
212
    ));
×
213

214
    chk!(dag.connect(
×
215
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
216
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
×
217
    ));
×
218

219
    chk!(dag.connect(
×
220
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
×
221
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
222
    ));
×
223

224
    let tmp_dir = chk!(TempDir::new("test"));
1✔
225
    let mut executor = chk!(DagExecutor::new(
1✔
226
        &dag,
×
227
        tmp_dir.path(),
×
228
        ExecutorOptions::default(),
×
229
        Arc::new(AtomicBool::new(true))
×
230
    ));
×
231

232
    chk!(executor.start());
×
233
    assert!(executor.join().is_err());
1✔
234
}
1✔
235

236
#[test]
1✔
237
#[should_panic]
238
fn test_run_dag_proc_err_3() {
1✔
239
    let count: u64 = 1_000_000;
1✔
240

1✔
241
    let mut dag = Dag::new();
1✔
242
    let latch = Arc::new(AtomicBool::new(true));
1✔
243

1✔
244
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
245
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
246
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
247
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
248

1✔
249
    dag.add_node(
1✔
250
        NodeType::Source(Arc::new(GeneratorSourceFactory::new(
1✔
251
            count,
1✔
252
            latch.clone(),
1✔
253
            false,
1✔
254
        ))),
1✔
255
        source_handle.clone(),
1✔
256
    );
1✔
257

1✔
258
    dag.add_node(
1✔
259
        NodeType::Processor(Arc::new(ErrorProcessorFactory {
1✔
260
            err_on: 800_000,
1✔
261
            panic: false,
1✔
262
        })),
1✔
263
        proc_err_handle.clone(),
1✔
264
    );
1✔
265

1✔
266
    dag.add_node(
1✔
267
        NodeType::Processor(Arc::new(NoopProcessorFactory {})),
1✔
268
        proc_handle.clone(),
1✔
269
    );
1✔
270

1✔
271
    dag.add_node(
1✔
272
        NodeType::Sink(Arc::new(CountingSinkFactory::new(count, latch))),
1✔
273
        sink_handle.clone(),
1✔
274
    );
1✔
275

276
    chk!(dag.connect(
×
277
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
278
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
×
279
    ));
×
280

281
    chk!(dag.connect(
×
282
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
×
283
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
284
    ));
×
285

286
    chk!(dag.connect(
×
287
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
288
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
289
    ));
×
290

291
    let tmp_dir = chk!(TempDir::new("test"));
1✔
292
    let mut executor = chk!(DagExecutor::new(
1✔
293
        &dag,
×
294
        tmp_dir.path(),
×
295
        ExecutorOptions::default(),
×
296
        Arc::new(AtomicBool::new(true))
×
297
    ));
×
298

299
    chk!(executor.start());
×
300
    assert!(executor.join().is_err());
1✔
301
}
1✔
302

303
// Test when error is generated by a source
304

305
#[derive(Debug)]
×
306
pub(crate) struct ErrGeneratorSourceFactory {
307
    count: u64,
308
    err_at: u64,
309
}
310

311
impl ErrGeneratorSourceFactory {
312
    pub fn new(count: u64, err_at: u64) -> Self {
1✔
313
        Self { count, err_at }
1✔
314
    }
1✔
315
}
316

317
impl SourceFactory<NoneContext> for ErrGeneratorSourceFactory {
318
    fn get_output_schema(
1✔
319
        &self,
1✔
320
        _port: &PortHandle,
1✔
321
    ) -> Result<(Schema, NoneContext), ExecutionError> {
1✔
322
        Ok((
1✔
323
            Schema::empty()
1✔
324
                .field(
1✔
325
                    FieldDefinition::new(
1✔
326
                        "id".to_string(),
1✔
327
                        FieldType::String,
1✔
328
                        false,
1✔
329
                        SourceDefinition::Dynamic,
1✔
330
                    ),
1✔
331
                    true,
1✔
332
                )
1✔
333
                .field(
1✔
334
                    FieldDefinition::new(
1✔
335
                        "value".to_string(),
1✔
336
                        FieldType::String,
1✔
337
                        false,
1✔
338
                        SourceDefinition::Dynamic,
1✔
339
                    ),
1✔
340
                    false,
1✔
341
                )
1✔
342
                .clone(),
1✔
343
            NoneContext {},
1✔
344
        ))
1✔
345
    }
1✔
346

347
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
3✔
348
        Ok(vec![OutputPortDef::new(
3✔
349
            GENERATOR_SOURCE_OUTPUT_PORT,
3✔
350
            OutputPortType::Stateless,
3✔
351
        )])
3✔
352
    }
3✔
353

354
    fn prepare(
×
355
        &self,
×
356
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
357
    ) -> Result<(), ExecutionError> {
×
358
        Ok(())
×
359
    }
×
360

361
    fn build(
1✔
362
        &self,
1✔
363
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
364
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
365
        Ok(Box::new(ErrGeneratorSource {
1✔
366
            count: self.count,
1✔
367
            err_at: self.err_at,
1✔
368
        }))
1✔
369
    }
1✔
370
}
371

372
#[derive(Debug)]
×
373
pub(crate) struct ErrGeneratorSource {
374
    count: u64,
375
    err_at: u64,
376
}
377

378
impl Source for ErrGeneratorSource {
379
    fn start(
1✔
380
        &self,
1✔
381
        fw: &mut dyn SourceChannelForwarder,
1✔
382
        _from_seq: Option<(u64, u64)>,
1✔
383
    ) -> Result<(), ExecutionError> {
1✔
384
        for n in 1..(self.count + 1) {
1✔
385
            if n == self.err_at {
1✔
386
                return Err(ExecutionError::InvalidOperation(
×
387
                    "Generated Error".to_string(),
×
388
                ));
×
389
            }
1✔
390

1✔
391
            fw.send(
1✔
392
                n,
1✔
393
                0,
1✔
394
                Operation::Insert {
1✔
395
                    new: Record::new(
1✔
396
                        None,
1✔
397
                        vec![
1✔
398
                            Field::String(format!("key_{n}")),
1✔
399
                            Field::String(format!("value_{n}")),
1✔
400
                        ],
1✔
401
                        None,
1✔
402
                    ),
1✔
403
                },
1✔
404
                GENERATOR_SOURCE_OUTPUT_PORT,
1✔
405
            )?;
1✔
406
        }
407
        Ok(())
×
408
    }
1✔
409
}
410

411
#[test]
1✔
412
fn test_run_dag_src_err() {
1✔
413
    let count: u64 = 1_000_000;
1✔
414

1✔
415
    let mut dag = Dag::new();
1✔
416
    let latch = Arc::new(AtomicBool::new(true));
1✔
417

1✔
418
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
419
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
420
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
421

1✔
422
    dag.add_node(
1✔
423
        NodeType::Source(Arc::new(ErrGeneratorSourceFactory::new(count, 200_000))),
1✔
424
        source_handle.clone(),
1✔
425
    );
1✔
426
    dag.add_node(
1✔
427
        NodeType::Processor(Arc::new(NoopProcessorFactory {})),
1✔
428
        proc_handle.clone(),
1✔
429
    );
1✔
430
    dag.add_node(
1✔
431
        NodeType::Sink(Arc::new(CountingSinkFactory::new(count, latch))),
1✔
432
        sink_handle.clone(),
1✔
433
    );
1✔
434

1✔
435
    chk!(dag.connect(
1✔
436
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
437
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
438
    ));
1✔
439

1✔
440
    chk!(dag.connect(
1✔
441
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
442
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
443
    ));
×
444

1✔
445
    let tmp_dir = chk!(TempDir::new("test"));
1✔
446
    let mut executor = chk!(DagExecutor::new(
1✔
447
        &dag,
×
448
        tmp_dir.path(),
×
449
        ExecutorOptions::default(),
×
450
        Arc::new(AtomicBool::new(true))
×
451
    ));
1✔
452

1✔
453
    executor.start().unwrap();
1✔
454
    //  assert!(executor.join().is_ok());
1✔
455
}
1✔
456

457
#[derive(Debug)]
×
458
pub(crate) struct ErrSinkFactory {
459
    err_at: u64,
460
    panic: bool,
461
}
462

463
impl ErrSinkFactory {
464
    pub fn new(err_at: u64, panic: bool) -> Self {
2✔
465
        Self { err_at, panic }
2✔
466
    }
2✔
467
}
468

469
impl SinkFactory<NoneContext> for ErrSinkFactory {
470
    fn get_input_ports(&self) -> Vec<PortHandle> {
2✔
471
        vec![COUNTING_SINK_INPUT_PORT]
2✔
472
    }
2✔
473

×
474
    fn prepare(
×
475
        &self,
×
476
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
477
    ) -> Result<(), ExecutionError> {
×
478
        Ok(())
×
479
    }
×
480

481
    fn build(
2✔
482
        &self,
2✔
483
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
484
    ) -> Result<Box<dyn Sink>, ExecutionError> {
2✔
485
        Ok(Box::new(ErrSink {
2✔
486
            err_at: self.err_at,
2✔
487
            current: 0,
2✔
488
            panic: self.panic,
2✔
489
        }))
2✔
490
    }
2✔
491
}
×
492

×
493
#[derive(Debug)]
×
494
pub(crate) struct ErrSink {
×
495
    err_at: u64,
×
496
    current: u64,
×
497
    panic: bool,
×
498
}
499
impl Sink for ErrSink {
500
    fn init(&mut self, _state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
2✔
501
        Ok(())
2✔
502
    }
2✔
503

504
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
66✔
505
        Ok(())
66✔
506
    }
66✔
507

×
508
    fn process(
400,000✔
509
        &mut self,
400,000✔
510
        _from_port: PortHandle,
400,000✔
511
        _op: Operation,
400,000✔
512
        _state: &SharedTransaction,
400,000✔
513
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
400,000✔
514
    ) -> Result<(), ExecutionError> {
400,000✔
515
        self.current += 1;
400,000✔
516
        if self.current == self.err_at {
400,000✔
517
            if self.panic {
2✔
518
                panic!("Generated error");
1✔
519
            } else {
×
520
                return Err(ExecutionError::InvalidOperation(
1✔
521
                    "Generated error".to_string(),
1✔
522
                ));
1✔
523
            }
×
524
        }
399,998✔
525
        Ok(())
399,998✔
526
    }
399,999✔
527
}
×
528

×
529
#[test]
1✔
530
#[should_panic]
531
fn test_run_dag_sink_err() {
1✔
532
    let count: u64 = 1_000_000;
1✔
533

1✔
534
    let mut dag = Dag::new();
1✔
535
    let latch = Arc::new(AtomicBool::new(true));
1✔
536

1✔
537
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
538
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
539
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
540

1✔
541
    dag.add_node(
1✔
542
        NodeType::Source(Arc::new(GeneratorSourceFactory::new(count, latch, false))),
1✔
543
        source_handle.clone(),
1✔
544
    );
1✔
545
    dag.add_node(
1✔
546
        NodeType::Processor(Arc::new(NoopProcessorFactory {})),
1✔
547
        proc_handle.clone(),
1✔
548
    );
1✔
549
    dag.add_node(
1✔
550
        NodeType::Sink(Arc::new(ErrSinkFactory::new(200_000, false))),
1✔
551
        sink_handle.clone(),
1✔
552
    );
1✔
553

×
554
    chk!(dag.connect(
×
555
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
556
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
557
    ));
×
558

×
559
    chk!(dag.connect(
×
560
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
561
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
562
    ));
×
563

×
564
    let tmp_dir = chk!(TempDir::new("test"));
1✔
565
    let mut executor = chk!(DagExecutor::new(
1✔
566
        &dag,
×
567
        tmp_dir.path(),
×
568
        ExecutorOptions::default(),
×
569
        Arc::new(AtomicBool::new(true))
×
570
    ));
×
571

×
572
    chk!(executor.start());
×
573
    assert!(executor.join().is_err());
1✔
574
}
1✔
575

×
576
#[test]
1✔
577
#[should_panic]
×
578
fn test_run_dag_sink_err_panic() {
1✔
579
    let count: u64 = 1_000_000;
1✔
580

1✔
581
    let mut dag = Dag::new();
1✔
582
    let latch = Arc::new(AtomicBool::new(true));
1✔
583

1✔
584
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
585
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
586
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
587

1✔
588
    dag.add_node(
1✔
589
        NodeType::Source(Arc::new(GeneratorSourceFactory::new(count, latch, false))),
1✔
590
        source_handle.clone(),
1✔
591
    );
1✔
592
    dag.add_node(
1✔
593
        NodeType::Processor(Arc::new(NoopProcessorFactory {})),
1✔
594
        proc_handle.clone(),
1✔
595
    );
1✔
596
    dag.add_node(
1✔
597
        NodeType::Sink(Arc::new(ErrSinkFactory::new(200_000, true))),
1✔
598
        sink_handle.clone(),
1✔
599
    );
1✔
600

×
601
    chk!(dag.connect(
×
602
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
603
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
604
    ));
×
605

×
606
    chk!(dag.connect(
×
607
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
608
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
609
    ));
×
610

×
611
    let tmp_dir = chk!(TempDir::new("test"));
1✔
612
    let mut executor = chk!(DagExecutor::new(
1✔
613
        &dag,
×
614
        tmp_dir.path(),
×
615
        ExecutorOptions::default(),
×
616
        Arc::new(AtomicBool::new(true))
×
617
    ));
×
618

×
619
    chk!(executor.start());
×
620
    assert!(executor.join().is_err());
1✔
621
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc