• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.85
/dozer-orchestrator/src/pipeline/sinks.rs
1
use dozer_api::generator::protoc::generator::ProtoGenerator;
2
use dozer_api::grpc::internal_grpc::pipeline_response::ApiEvent;
3
use dozer_api::grpc::internal_grpc::PipelineResponse;
4
use dozer_api::grpc::types_helper;
5
use dozer_api::{CacheEndpoint, PipelineDetails};
6
use dozer_cache::cache::expression::QueryExpression;
7
use dozer_cache::cache::index::get_primary_key;
8
use dozer_cache::cache::{
9
    lmdb_rs::{self, Transaction},
10
    Cache, LmdbCache,
11
};
12
use dozer_core::dag::epoch::Epoch;
13
use dozer_core::dag::errors::{ExecutionError, SinkError};
14
use dozer_core::dag::node::{PortHandle, Sink, SinkFactory};
15
use dozer_core::dag::record_store::RecordReader;
16
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
17
use dozer_sql::pipeline::builder::SchemaSQLContext;
18
use dozer_types::crossbeam::channel::Sender;
19
use dozer_types::indicatif::{MultiProgress, ProgressBar, ProgressStyle};
20
use dozer_types::log::debug;
21
use dozer_types::models::api_endpoint::{ApiEndpoint, ApiIndex};
22
use dozer_types::models::api_security::ApiSecurity;
23
use dozer_types::models::flags::Flags;
24
use dozer_types::types::FieldType;
25
use dozer_types::types::{IndexDefinition, Operation, Schema, SchemaIdentifier};
26
use std::collections::hash_map::DefaultHasher;
27
use std::collections::HashMap;
28
use std::hash::Hasher;
29
use std::path::PathBuf;
30
use std::sync::Arc;
31

32
pub fn attach_progress(multi_pb: Option<MultiProgress>) -> ProgressBar {
1✔
33
    let pb = ProgressBar::new_spinner();
1✔
34
    multi_pb.as_ref().map(|m| m.add(pb.clone()));
1✔
35
    pb.set_style(
1✔
36
        ProgressStyle::with_template("{spinner:.blue} {msg}")
1✔
37
            .unwrap()
1✔
38
            // For more spinners check out the cli-spinners project:
1✔
39
            // https://github.com/sindresorhus/cli-spinners/blob/master/spinners.json
1✔
40
            .tick_strings(&[
1✔
41
                "▹▹▹▹▹",
1✔
42
                "▸▹▹▹▹",
1✔
43
                "▹▸▹▹▹",
1✔
44
                "▹▹▸▹▹",
1✔
45
                "▹▹▹▸▹",
1✔
46
                "▹▹▹▹▸",
1✔
47
                "▪▪▪▪▪",
1✔
48
            ]),
1✔
49
    );
1✔
50
    pb
1✔
51
}
1✔
52
#[derive(Debug, Clone)]
×
53
pub struct CacheSinkSettings {
54
    flags: Option<Flags>,
55
    api_security: Option<ApiSecurity>,
56
}
57
impl CacheSinkSettings {
58
    pub fn new(flags: Option<Flags>, api_security: Option<ApiSecurity>) -> Self {
×
59
        Self {
×
60
            flags,
×
61
            api_security,
×
62
        }
×
63
    }
×
64
}
65
#[derive(Debug)]
×
66
pub struct CacheSinkFactory {
67
    input_ports: Vec<PortHandle>,
68
    cache: Arc<LmdbCache>,
69
    api_endpoint: ApiEndpoint,
70
    notifier: Option<Sender<PipelineResponse>>,
71
    generated_path: PathBuf,
72
    multi_pb: MultiProgress,
73
    settings: CacheSinkSettings,
74
}
75

76
impl CacheSinkFactory {
77
    pub fn new(
×
78
        input_ports: Vec<PortHandle>,
×
79
        cache: Arc<LmdbCache>,
×
80
        api_endpoint: ApiEndpoint,
×
81
        notifier: Option<Sender<PipelineResponse>>,
×
82
        generated_path: PathBuf,
×
83
        multi_pb: MultiProgress,
×
84
        settings: CacheSinkSettings,
×
85
    ) -> Self {
×
86
        Self {
×
87
            input_ports,
×
88
            cache,
×
89
            api_endpoint,
×
90
            notifier,
×
91
            generated_path,
×
92
            multi_pb,
×
93
            settings,
×
94
        }
×
95
    }
×
96

97
    fn get_output_schema(
×
98
        &self,
×
99
        schema: &Schema,
×
100
    ) -> Result<(Schema, Vec<IndexDefinition>), ExecutionError> {
×
101
        let mut schema = schema.clone();
×
102

×
103
        // Get hash of schema
×
104
        let hash = self.get_schema_hash();
×
105

106
        // Generated Cache index based on api_index
107
        let configured_index = create_primary_indexes(
×
108
            &schema,
×
109
            &self.api_endpoint.index.to_owned().unwrap_or_default(),
×
110
        )?;
×
111
        // Generated schema in SQL
112
        let upstream_index = schema.primary_index.clone();
×
113

114
        let index = match (configured_index.is_empty(), upstream_index.is_empty()) {
×
115
            (true, true) => vec![],
×
116
            (true, false) => upstream_index,
×
117
            (false, true) => configured_index,
×
118
            (false, false) => {
119
                if !upstream_index.eq(&configured_index) {
×
120
                    return Err(ExecutionError::MismatchPrimaryKey {
×
121
                        endpoint_name: self.api_endpoint.name.clone(),
×
122
                        expected: get_field_names(&schema, &upstream_index),
×
123
                        actual: get_field_names(&schema, &configured_index),
×
124
                    });
×
125
                }
×
126
                configured_index
×
127
            }
128
        };
129

130
        schema.primary_index = index;
×
131

×
132
        schema.identifier = Some(SchemaIdentifier {
×
133
            id: hash as u32,
×
134
            version: 1,
×
135
        });
×
136

×
137
        // Automatically create secondary indexes
×
138
        let secondary_indexes = schema
×
139
            .fields
×
140
            .iter()
×
141
            .enumerate()
×
142
            .flat_map(|(idx, f)| match f.typ {
×
143
                // Create sorted inverted indexes for these fields
144
                FieldType::UInt
145
                | FieldType::Int
146
                | FieldType::Float
147
                | FieldType::Boolean
148
                | FieldType::Decimal
149
                | FieldType::Timestamp
150
                | FieldType::Date => vec![IndexDefinition::SortedInverted(vec![idx])],
×
151

152
                // Create sorted inverted and full text indexes for string fields.
153
                FieldType::String => vec![
×
154
                    IndexDefinition::SortedInverted(vec![idx]),
×
155
                    IndexDefinition::FullText(idx),
×
156
                ],
×
157

158
                // Create full text indexes for text fields
159
                FieldType::Text => vec![IndexDefinition::FullText(idx)],
×
160

161
                // Skip creating indexes
162
                FieldType::Binary | FieldType::Bson => vec![],
×
163
            })
×
164
            .collect();
×
165
        Ok((schema, secondary_indexes))
×
166
    }
×
167

168
    fn get_schema_hash(&self) -> u64 {
×
169
        // Get hash of SQL
×
170
        let mut hasher = DefaultHasher::new();
×
171
        let name = self
×
172
            .api_endpoint
×
173
            .sql
×
174
            .as_ref()
×
175
            .map_or(self.api_endpoint.name.clone(), |sql| sql.clone());
×
176
        hasher.write(name.as_bytes());
×
177

×
178
        hasher.finish()
×
179
    }
×
180
}
181

182
impl SinkFactory<SchemaSQLContext> for CacheSinkFactory {
183
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
184
        self.input_ports.clone()
×
185
    }
×
186

×
187
    fn prepare(
×
188
        &self,
×
189
        input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
190
    ) -> Result<(), ExecutionError> {
×
191
        use std::println as stdinfo;
×
192
        // Insert schemas into cache
×
193
        debug!(
×
194
            "SinkFactory: Initialising CacheSinkFactory: {}",
×
195
            self.api_endpoint.name
×
196
        );
×
197
        for (_, (schema, _ctx)) in input_schemas.iter() {
×
198
            stdinfo!(
×
199
                "SINK: Initializing output schema: {}",
×
200
                self.api_endpoint.name
×
201
            );
×
202
            let (pipeline_schema, secondary_indexes) = self.get_output_schema(schema)?;
×
203
            pipeline_schema.print().printstd();
×
204

×
205
            if self
×
206
                .cache
×
207
                .get_schema_and_indexes_by_name(&self.api_endpoint.name)
×
208
                .is_err()
×
209
            {
×
210
                self.cache
×
211
                    .insert_schema(
×
212
                        &self.api_endpoint.name,
×
213
                        &pipeline_schema,
×
214
                        &secondary_indexes,
×
215
                    )
×
216
                    .map_err(|e| {
×
217
                        ExecutionError::SinkError(SinkError::SchemaUpdateFailed(Box::new(e)))
×
218
                    })?;
×
219
                debug!(
×
220
                    "SinkFactory: Inserted schema for {}",
×
221
                    self.api_endpoint.name
×
222
                );
×
223
            }
×
224
        }
×
225

×
226
        ProtoGenerator::generate(
×
227
            &self.generated_path,
×
228
            PipelineDetails {
×
229
                schema_name: self.api_endpoint.name.to_owned(),
×
230
                cache_endpoint: CacheEndpoint {
×
231
                    cache: self.cache.to_owned(),
×
232
                    endpoint: self.api_endpoint.to_owned(),
×
233
                },
×
234
            },
×
235
            &self.settings.api_security,
×
236
            &self.settings.flags,
×
237
        )
×
238
        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
×
239

×
240
        Ok(())
×
241
    }
×
242

×
243
    fn build(
×
244
        &self,
×
245
        input_schemas: HashMap<PortHandle, Schema>,
×
246
    ) -> Result<Box<dyn Sink>, ExecutionError> {
×
247
        let mut sink_schemas: HashMap<PortHandle, (Schema, Vec<IndexDefinition>)> = HashMap::new();
×
248
        // Insert schemas into cache
×
249
        for (k, schema) in input_schemas {
×
250
            let (schema, secondary_indexes) = self.get_output_schema(&schema)?;
×
251
            sink_schemas.insert(k, (schema, secondary_indexes));
×
252
        }
×
253
        Ok(Box::new(CacheSink::new(
×
254
            self.cache.clone(),
×
255
            self.api_endpoint.clone(),
×
256
            sink_schemas,
×
257
            self.notifier.clone(),
×
258
            Some(self.multi_pb.clone()),
×
259
        )))
×
260
    }
×
261
}
×
262

×
263
fn create_primary_indexes(
×
264
    schema: &Schema,
×
265
    api_index: &ApiIndex,
×
266
) -> Result<Vec<usize>, ExecutionError> {
×
267
    let mut primary_index = Vec::new();
×
268
    for name in api_index.primary_key.iter() {
×
269
        let idx = schema
×
270
            .fields
×
271
            .iter()
×
272
            .position(|fd| fd.name == name.clone())
×
273
            .map_or(Err(ExecutionError::FieldNotFound(name.to_owned())), |p| {
×
274
                Ok(p)
×
275
            })?;
×
276

×
277
        primary_index.push(idx);
×
278
    }
×
279
    Ok(primary_index)
×
280
}
×
281

×
282
fn get_field_names(schema: &Schema, indexes: &[usize]) -> Vec<String> {
×
283
    indexes
×
284
        .iter()
×
285
        .map(|idx| schema.fields[*idx].name.to_owned())
×
286
        .collect()
×
287
}
×
288

289
#[derive(Debug)]
×
290
pub struct CacheSink {
×
291
    // It's not really 'static, the actual lifetime is the lifetime of `cache`. See comments in `process`.
×
292
    txn: Option<lmdb_rs::RwTransaction<'static>>,
×
293
    cache: Arc<LmdbCache>,
×
294
    counter: usize,
×
295
    input_schemas: HashMap<PortHandle, (Schema, Vec<IndexDefinition>)>,
296
    api_endpoint: ApiEndpoint,
×
297
    pb: ProgressBar,
298
    notifier: Option<Sender<PipelineResponse>>,
299
}
300

301
impl Sink for CacheSink {
302
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
2✔
303
        // Update Counter on commit
2✔
304
        self.pb.set_message(format!(
2✔
305
            "{}: Count: {}",
2✔
306
            self.api_endpoint.name.to_owned(),
2✔
307
            self.counter,
2✔
308
        ));
2✔
309
        if let Some(txn) = self.txn.take() {
2✔
310
            txn.commit().map_err(|e| {
2✔
311
                ExecutionError::SinkError(SinkError::CacheCommitTransactionFailed(Box::new(e)))
×
312
            })?;
2✔
313
        }
×
314
        Ok(())
2✔
315
    }
2✔
316

×
317
    fn init(&mut self, _tx: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
×
318
        let query = QueryExpression::new(None, vec![], None, 0);
×
319
        self.counter = self
×
320
            .cache
×
321
            .count(&self.api_endpoint.name, &query)
×
322
            .map_err(|e| ExecutionError::SinkError(SinkError::CacheCountFailed(Box::new(e))))?;
×
323

324
        debug!(
×
325
            "SINK: Initialising CacheSink: {} with count: {}",
×
326
            self.api_endpoint.name, self.counter
×
327
        );
×
328
        Ok(())
×
329
    }
×
330

331
    fn process(
2✔
332
        &mut self,
2✔
333
        from_port: PortHandle,
2✔
334
        op: Operation,
2✔
335
        _tx: &SharedTransaction,
2✔
336
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
2✔
337
    ) -> Result<(), ExecutionError> {
2✔
338
        self.counter += 1;
2✔
339

2✔
340
        if self.txn.is_none() {
2✔
341
            let txn = self.cache.begin_rw_txn().map_err(|e| {
2✔
342
                ExecutionError::SinkError(SinkError::CacheBeginTransactionFailed(Box::new(e)))
×
343
            })?;
2✔
344
            // SAFETY:
×
345
            // 1. `std::mem::transmute` is only used to extend the lifetime of `txn` to `'static`.
×
346
            // 2. `RwTransaction` doesn't reference data in `LmdbCache`, the lifetime of it is only
×
347
            // to ensure that the returned `RwTransaction` does not outlive `LmdbCache`.
×
348
            // 3. `txn` in `CacheSink` is private, and we don't expose it to the outside, so the one owning
×
349
            // `txn` must own `CacheSink`.
×
350
            // 4. The declaration order in `CacheSink` ensures `txn` is dropped before `cache`.
×
351
            let txn = unsafe {
2✔
352
                std::mem::transmute::<lmdb_rs::RwTransaction<'_>, lmdb_rs::RwTransaction<'static>>(
2✔
353
                    txn,
2✔
354
                )
2✔
355
            };
2✔
356
            self.txn = Some(txn);
2✔
357
        }
×
358
        let txn = self.txn.as_mut().unwrap();
2✔
359

×
360
        let (schema, secondary_indexes) = self
2✔
361
            .input_schemas
2✔
362
            .get(&from_port)
2✔
363
            .ok_or(ExecutionError::SchemaNotInitialized)?;
2✔
364

×
365
        if let Some(notifier) = &self.notifier {
2✔
366
            let op = types_helper::map_operation(self.api_endpoint.name.to_owned(), &op);
×
367
            notifier
×
368
                .try_send(PipelineResponse {
×
369
                    endpoint: self.api_endpoint.name.to_owned(),
×
370
                    api_event: Some(ApiEvent::Op(op)),
×
371
                })
×
372
                .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
×
373
        }
2✔
374
        match op {
2✔
375
            Operation::Delete { mut old } => {
×
376
                old.schema_id = schema.identifier;
×
377
                let key = get_primary_key(&schema.primary_index, &old.values);
×
378
                self.cache
×
379
                    .delete_with_txn(txn, &key, &old, schema, secondary_indexes)
×
380
                    .map_err(|e| {
×
381
                        ExecutionError::SinkError(SinkError::CacheDeleteFailed(Box::new(e)))
×
382
                    })?;
×
383
            }
×
384
            Operation::Insert { mut new } => {
1✔
385
                new.schema_id = schema.identifier;
1✔
386
                self.cache
1✔
387
                    .insert_with_txn(txn, &new, schema, secondary_indexes)
1✔
388
                    .map_err(|e| {
1✔
389
                        ExecutionError::SinkError(SinkError::CacheInsertFailed(Box::new(e)))
×
390
                    })?;
1✔
391
            }
×
392
            Operation::Update { mut old, mut new } => {
1✔
393
                old.schema_id = schema.identifier;
1✔
394
                new.schema_id = schema.identifier;
1✔
395
                let key = get_primary_key(&schema.primary_index, &old.values);
1✔
396
                self.cache
1✔
397
                    .update_with_txn(txn, &key, &old, &new, schema, secondary_indexes)
1✔
398
                    .map_err(|e| {
1✔
399
                        ExecutionError::SinkError(SinkError::CacheUpdateFailed(Box::new(e)))
×
400
                    })?;
1✔
401
            }
×
402
        }
×
403

×
404
        Ok(())
2✔
405
    }
2✔
406
}
×
407

×
408
impl CacheSink {
409
    pub fn new(
1✔
410
        cache: Arc<LmdbCache>,
1✔
411
        api_endpoint: ApiEndpoint,
1✔
412
        input_schemas: HashMap<PortHandle, (Schema, Vec<IndexDefinition>)>,
1✔
413
        notifier: Option<Sender<PipelineResponse>>,
1✔
414
        multi_pb: Option<MultiProgress>,
1✔
415
    ) -> Self {
1✔
416
        let pb = attach_progress(multi_pb);
1✔
417
        Self {
1✔
418
            txn: None,
1✔
419
            cache,
1✔
420
            counter: 0,
1✔
421
            input_schemas,
1✔
422
            api_endpoint,
1✔
423
            pb,
1✔
424
            notifier,
1✔
425
        }
1✔
426
    }
1✔
427
}
×
428

×
429
#[cfg(test)]
×
430
mod tests {
×
431

×
432
    use crate::test_utils;
×
433
    use dozer_cache::cache::{index, Cache};
×
434

435
    use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
436
    use dozer_core::dag::node::{NodeHandle, Sink};
437
    use dozer_core::storage::lmdb_storage::LmdbEnvironmentManager;
438

439
    use dozer_types::types::{Field, IndexDefinition, Operation, Record, SchemaIdentifier};
440
    use std::collections::HashMap;
441
    use tempdir::TempDir;
442

443
    #[test]
1✔
444
    // This test cases covers update of records when primary key changes because of value change in primary_key
445
    fn update_record_when_primary_changes() {
1✔
446
        let tmp_dir = TempDir::new("example").unwrap();
1✔
447
        let env = LmdbEnvironmentManager::create(tmp_dir.path(), "test").unwrap();
1✔
448
        let txn = env.create_txn().unwrap();
1✔
449

1✔
450
        let schema = test_utils::get_schema();
1✔
451
        let secondary_indexes: Vec<IndexDefinition> = schema
1✔
452
            .fields
1✔
453
            .iter()
1✔
454
            .enumerate()
1✔
455
            .map(|(idx, _f)| IndexDefinition::SortedInverted(vec![idx]))
2✔
456
            .collect();
1✔
457

1✔
458
        let (cache, mut sink) = test_utils::init_sink(&schema, secondary_indexes.clone());
1✔
459

1✔
460
        let mut input_schemas = HashMap::new();
1✔
461
        input_schemas.insert(DEFAULT_PORT_HANDLE, schema.clone());
1✔
462
        // sink.update_schema(&input_schemas).unwrap();
1✔
463

1✔
464
        // Initialing schemas
1✔
465
        cache
1✔
466
            .insert_schema("films", &schema, &secondary_indexes)
1✔
467
            .unwrap();
1✔
468

1✔
469
        let initial_values = vec![Field::Int(1), Field::String("Film name old".to_string())];
1✔
470

1✔
471
        let updated_values = vec![
1✔
472
            Field::Int(2),
1✔
473
            Field::String("Film name updated".to_string()),
1✔
474
        ];
1✔
475

1✔
476
        let insert_operation = Operation::Insert {
1✔
477
            new: Record {
1✔
478
                schema_id: Option::from(SchemaIdentifier { id: 1, version: 1 }),
1✔
479
                values: initial_values.clone(),
1✔
480
                version: None,
1✔
481
            },
1✔
482
        };
1✔
483

1✔
484
        let update_operation = Operation::Update {
1✔
485
            old: Record {
1✔
486
                schema_id: Option::from(SchemaIdentifier { id: 1, version: 1 }),
1✔
487
                values: initial_values.clone(),
1✔
488
                version: None,
1✔
489
            },
1✔
490
            new: Record {
1✔
491
                schema_id: Option::from(SchemaIdentifier { id: 1, version: 1 }),
1✔
492
                values: updated_values.clone(),
1✔
493
                version: None,
1✔
494
            },
1✔
495
        };
1✔
496

1✔
497
        sink.process(DEFAULT_PORT_HANDLE, insert_operation, &txn, &HashMap::new())
1✔
498
            .unwrap();
1✔
499
        sink.commit(
1✔
500
            &dozer_core::dag::epoch::Epoch::from(
1✔
501
                0,
1✔
502
                NodeHandle::new(Some(DEFAULT_PORT_HANDLE), "".to_string()),
1✔
503
                0,
1✔
504
                0,
1✔
505
            ),
1✔
506
            &txn,
1✔
507
        )
1✔
508
        .unwrap();
1✔
509

1✔
510
        let key = index::get_primary_key(&schema.primary_index, &initial_values);
1✔
511
        let record = cache.get(&key).unwrap();
1✔
512

1✔
513
        assert_eq!(initial_values, record.values);
1✔
514

×
515
        sink.process(DEFAULT_PORT_HANDLE, update_operation, &txn, &HashMap::new())
1✔
516
            .unwrap();
1✔
517
        let epoch1 = dozer_core::dag::epoch::Epoch::from(
1✔
518
            0,
1✔
519
            NodeHandle::new(Some(DEFAULT_PORT_HANDLE), "".to_string()),
1✔
520
            0,
1✔
521
            1,
1✔
522
        );
1✔
523
        sink.commit(&epoch1, &txn).unwrap();
1✔
524

1✔
525
        // Primary key with old values
1✔
526
        let key = index::get_primary_key(&schema.primary_index, &initial_values);
1✔
527

1✔
528
        let record = cache.get(&key);
1✔
529

1✔
530
        assert!(record.is_err());
1✔
531

×
532
        // Primary key with updated values
×
533
        let key = index::get_primary_key(&schema.primary_index, &updated_values);
1✔
534
        let record = cache.get(&key).unwrap();
1✔
535

1✔
536
        assert_eq!(updated_values, record.values);
1✔
537
    }
1✔
538
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc