• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4023928230

pending completion
4023928230

Pull #744

github

GitHub
Merge fe141bf50 into 70bd6e0ad
Pull Request #744: feat: Implement direct insert to cache pipeline

243 of 243 new or added lines in 16 files covered. (100.0%)

23197 of 34900 relevant lines covered (66.47%)

45617.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.28
/dozer-orchestrator/src/pipeline/sinks.rs
1
use dozer_api::generator::protoc::generator::ProtoGenerator;
2
use dozer_api::grpc::internal_grpc::pipeline_response::ApiEvent;
3
use dozer_api::grpc::internal_grpc::PipelineResponse;
4
use dozer_api::grpc::types_helper;
5
use dozer_api::{CacheEndpoint, PipelineDetails};
6
use dozer_cache::cache::expression::QueryExpression;
7
use dozer_cache::cache::index::get_primary_key;
8
use dozer_cache::cache::{
9
    lmdb_rs::{self, Transaction},
10
    Cache, LmdbCache,
11
};
12
use dozer_core::dag::epoch::Epoch;
13
use dozer_core::dag::errors::{ExecutionError, SinkError};
14
use dozer_core::dag::node::{PortHandle, Sink, SinkFactory};
15
use dozer_core::dag::record_store::RecordReader;
16
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
17
use dozer_sql::pipeline::builder::SchemaSQLContext;
18
use dozer_types::crossbeam::channel::Sender;
19
use dozer_types::indicatif::{MultiProgress, ProgressBar, ProgressStyle};
20
use dozer_types::log::debug;
21
use dozer_types::models::api_endpoint::{ApiEndpoint, ApiIndex};
22
use dozer_types::models::api_security::ApiSecurity;
23
use dozer_types::models::flags::Flags;
24
use dozer_types::types::FieldType;
25
use dozer_types::types::{IndexDefinition, Operation, Schema, SchemaIdentifier};
26
use std::collections::hash_map::DefaultHasher;
27
use std::collections::HashMap;
28
use std::hash::Hasher;
29
use std::path::PathBuf;
30
use std::sync::Arc;
31

32
pub fn attach_progress(multi_pb: Option<MultiProgress>) -> ProgressBar {
1✔
33
    let pb = ProgressBar::new_spinner();
1✔
34
    multi_pb.as_ref().map(|m| m.add(pb.clone()));
1✔
35
    pb.set_style(
1✔
36
        ProgressStyle::with_template("{spinner:.blue} {msg}")
1✔
37
            .unwrap()
1✔
38
            // For more spinners check out the cli-spinners project:
1✔
39
            // https://github.com/sindresorhus/cli-spinners/blob/master/spinners.json
1✔
40
            .tick_strings(&[
1✔
41
                "▹▹▹▹▹",
1✔
42
                "▸▹▹▹▹",
1✔
43
                "▹▸▹▹▹",
1✔
44
                "▹▹▸▹▹",
1✔
45
                "▹▹▹▸▹",
1✔
46
                "▹▹▹▹▸",
1✔
47
                "▪▪▪▪▪",
1✔
48
            ]),
1✔
49
    );
1✔
50
    pb
1✔
51
}
1✔
52
#[derive(Debug, Clone)]
×
53
pub struct CacheSinkSettings {
54
    flags: Option<Flags>,
55
    api_security: Option<ApiSecurity>,
56
}
57
impl CacheSinkSettings {
58
    pub fn new(flags: Option<Flags>, api_security: Option<ApiSecurity>) -> Self {
×
59
        Self {
×
60
            flags,
×
61
            api_security,
×
62
        }
×
63
    }
×
64
}
65
#[derive(Debug)]
×
66
pub struct CacheSinkFactory {
67
    input_ports: Vec<PortHandle>,
68
    cache: Arc<LmdbCache>,
69
    api_endpoint: ApiEndpoint,
70
    notifier: Option<Sender<PipelineResponse>>,
71
    generated_path: PathBuf,
72
    multi_pb: MultiProgress,
73
    settings: CacheSinkSettings,
74
}
75

76
impl CacheSinkFactory {
77
    pub fn new(
×
78
        input_ports: Vec<PortHandle>,
×
79
        cache: Arc<LmdbCache>,
×
80
        api_endpoint: ApiEndpoint,
×
81
        notifier: Option<Sender<PipelineResponse>>,
×
82
        generated_path: PathBuf,
×
83
        multi_pb: MultiProgress,
×
84
        settings: CacheSinkSettings,
×
85
    ) -> Self {
×
86
        Self {
×
87
            input_ports,
×
88
            cache,
×
89
            api_endpoint,
×
90
            notifier,
×
91
            generated_path,
×
92
            multi_pb,
×
93
            settings,
×
94
        }
×
95
    }
×
96

97
    fn get_output_schema(
×
98
        &self,
×
99
        schema: &Schema,
×
100
    ) -> Result<(Schema, Vec<IndexDefinition>), ExecutionError> {
×
101
        let mut schema = schema.clone();
×
102

×
103
        // Get hash of schema
×
104
        let hash = self.get_schema_hash();
×
105

106
        // Generated Cache index based on api_index
107
        let configured_index = create_primary_indexes(
×
108
            &schema,
×
109
            &self.api_endpoint.index.to_owned().unwrap_or_default(),
×
110
        )?;
×
111
        // Generated schema in SQL
112
        let upstream_index = schema.primary_index.clone();
×
113

114
        let index = match (configured_index.is_empty(), upstream_index.is_empty()) {
×
115
            (true, true) => vec![],
×
116
            (true, false) => upstream_index,
×
117
            (false, true) => configured_index,
×
118
            (false, false) => {
119
                if !upstream_index.eq(&configured_index) {
×
120
                    return Err(ExecutionError::MismatchPrimaryKey {
×
121
                        endpoint_name: self.api_endpoint.name.clone(),
×
122
                        expected: get_field_names(&schema, &upstream_index),
×
123
                        actual: get_field_names(&schema, &configured_index),
×
124
                    });
×
125
                }
×
126
                configured_index
×
127
            }
128
        };
129

130
        schema.primary_index = index;
×
131

×
132
        schema.identifier = Some(SchemaIdentifier {
×
133
            id: hash as u32,
×
134
            version: 1,
×
135
        });
×
136

×
137
        // Automatically create secondary indexes
×
138
        let secondary_indexes = schema
×
139
            .fields
×
140
            .iter()
×
141
            .enumerate()
×
142
            .flat_map(|(idx, f)| match f.typ {
×
143
                // Create sorted inverted indexes for these fields
144
                FieldType::UInt
145
                | FieldType::Int
146
                | FieldType::Float
147
                | FieldType::Boolean
148
                | FieldType::Decimal
149
                | FieldType::Timestamp
150
                | FieldType::Date => vec![IndexDefinition::SortedInverted(vec![idx])],
×
151

152
                // Create sorted inverted and full text indexes for string fields.
153
                FieldType::String => vec![
×
154
                    IndexDefinition::SortedInverted(vec![idx]),
×
155
                    IndexDefinition::FullText(idx),
×
156
                ],
×
157

158
                // Create full text indexes for text fields
159
                FieldType::Text => vec![IndexDefinition::FullText(idx)],
×
160

161
                // Skip creating indexes
162
                FieldType::Binary | FieldType::Bson => vec![],
×
163
            })
×
164
            .collect();
×
165
        Ok((schema, secondary_indexes))
×
166
    }
×
167

168
    fn get_schema_hash(&self) -> u64 {
×
169
        // Get hash of SQL
×
170
        let mut hasher = DefaultHasher::new();
×
171
        let name = self
×
172
            .api_endpoint
×
173
            .sql
×
174
            .as_ref()
×
175
            .map_or(self.api_endpoint.name.clone(), |sql| sql.clone());
×
176
        hasher.write(name.as_bytes());
×
177

×
178
        hasher.finish()
×
179
    }
×
180
}
×
181

×
182
impl SinkFactory<SchemaSQLContext> for CacheSinkFactory {
×
183
    fn set_input_schema(
×
184
        &self,
×
185
        _input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
186
    ) -> Result<(), ExecutionError> {
×
187
        Ok(())
×
188
    }
×
189

190
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
191
        self.input_ports.clone()
×
192
    }
×
193

×
194
    fn prepare(
×
195
        &self,
×
196
        input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
197
    ) -> Result<(), ExecutionError> {
×
198
        use std::println as stdinfo;
×
199
        // Insert schemas into cache
×
200
        debug!(
×
201
            "SinkFactory: Initialising CacheSinkFactory: {}",
×
202
            self.api_endpoint.name
×
203
        );
×
204
        for (_, (schema, _ctx)) in input_schemas.iter() {
×
205
            stdinfo!(
×
206
                "SINK: Initializing output schema: {}",
×
207
                self.api_endpoint.name
×
208
            );
×
209
            let (pipeline_schema, secondary_indexes) = self.get_output_schema(schema)?;
×
210
            pipeline_schema.print().printstd();
×
211

×
212
            if self
×
213
                .cache
×
214
                .get_schema_and_indexes_by_name(&self.api_endpoint.name)
×
215
                .is_err()
×
216
            {
×
217
                self.cache
×
218
                    .insert_schema(
×
219
                        &self.api_endpoint.name,
×
220
                        &pipeline_schema,
×
221
                        &secondary_indexes,
×
222
                    )
×
223
                    .map_err(|e| {
×
224
                        ExecutionError::SinkError(SinkError::SchemaUpdateFailed(Box::new(e)))
×
225
                    })?;
×
226
                debug!(
×
227
                    "SinkFactory: Inserted schema for {}",
×
228
                    self.api_endpoint.name
229
                );
×
230
            }
×
231
        }
×
232

×
233
        ProtoGenerator::generate(
×
234
            &self.generated_path,
×
235
            PipelineDetails {
×
236
                schema_name: self.api_endpoint.name.to_owned(),
×
237
                cache_endpoint: CacheEndpoint {
×
238
                    cache: self.cache.to_owned(),
×
239
                    endpoint: self.api_endpoint.to_owned(),
×
240
                },
×
241
            },
×
242
            &self.settings.api_security,
×
243
            &self.settings.flags,
×
244
        )
×
245
        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
×
246

×
247
        Ok(())
×
248
    }
×
249

×
250
    fn build(
×
251
        &self,
×
252
        input_schemas: HashMap<PortHandle, Schema>,
×
253
    ) -> Result<Box<dyn Sink>, ExecutionError> {
×
254
        let mut sink_schemas: HashMap<PortHandle, (Schema, Vec<IndexDefinition>)> = HashMap::new();
×
255
        // Insert schemas into cache
256
        for (k, schema) in input_schemas {
×
257
            let (schema, secondary_indexes) = self.get_output_schema(&schema)?;
×
258
            sink_schemas.insert(k, (schema, secondary_indexes));
×
259
        }
×
260
        Ok(Box::new(CacheSink::new(
×
261
            self.cache.clone(),
×
262
            self.api_endpoint.clone(),
×
263
            sink_schemas,
×
264
            self.notifier.clone(),
×
265
            Some(self.multi_pb.clone()),
×
266
        )))
×
267
    }
×
268
}
×
269

×
270
fn create_primary_indexes(
×
271
    schema: &Schema,
×
272
    api_index: &ApiIndex,
×
273
) -> Result<Vec<usize>, ExecutionError> {
×
274
    let mut primary_index = Vec::new();
×
275
    for name in api_index.primary_key.iter() {
×
276
        let idx = schema
×
277
            .fields
×
278
            .iter()
×
279
            .position(|fd| fd.name == name.clone())
×
280
            .map_or(Err(ExecutionError::FieldNotFound(name.to_owned())), |p| {
×
281
                Ok(p)
×
282
            })?;
×
283

×
284
        primary_index.push(idx);
×
285
    }
×
286
    Ok(primary_index)
×
287
}
×
288

×
289
fn get_field_names(schema: &Schema, indexes: &[usize]) -> Vec<String> {
×
290
    indexes
×
291
        .iter()
×
292
        .map(|idx| schema.fields[*idx].name.to_owned())
×
293
        .collect()
×
294
}
×
295

296
#[derive(Debug)]
×
297
pub struct CacheSink {
298
    // It's not really 'static, the actual lifetime is the lifetime of `cache`. See comments in `process`.
299
    txn: Option<lmdb_rs::RwTransaction<'static>>,
300
    cache: Arc<LmdbCache>,
301
    counter: usize,
302
    input_schemas: HashMap<PortHandle, (Schema, Vec<IndexDefinition>)>,
303
    api_endpoint: ApiEndpoint,
304
    pb: ProgressBar,
305
    notifier: Option<Sender<PipelineResponse>>,
×
306
}
×
307

×
308
impl Sink for CacheSink {
×
309
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
2✔
310
        // Update Counter on commit
2✔
311
        self.pb.set_message(format!(
2✔
312
            "{}: Count: {}",
2✔
313
            self.api_endpoint.name.to_owned(),
2✔
314
            self.counter,
2✔
315
        ));
2✔
316
        if let Some(txn) = self.txn.take() {
2✔
317
            txn.commit().map_err(|e| {
2✔
318
                ExecutionError::SinkError(SinkError::CacheCommitTransactionFailed(Box::new(e)))
×
319
            })?;
2✔
320
        }
×
321
        Ok(())
2✔
322
    }
2✔
323

×
324
    fn init(&mut self, _tx: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
×
325
        let query = QueryExpression::new(None, vec![], None, 0);
×
326
        self.counter = self
×
327
            .cache
×
328
            .count(&self.api_endpoint.name, &query)
×
329
            .map_err(|e| ExecutionError::SinkError(SinkError::CacheCountFailed(Box::new(e))))?;
×
330

331
        debug!(
×
332
            "SINK: Initialising CacheSink: {} with count: {}",
×
333
            self.api_endpoint.name, self.counter
334
        );
×
335
        Ok(())
×
336
    }
×
337

×
338
    fn process(
2✔
339
        &mut self,
2✔
340
        from_port: PortHandle,
2✔
341
        op: Operation,
2✔
342
        _tx: &SharedTransaction,
2✔
343
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
2✔
344
    ) -> Result<(), ExecutionError> {
2✔
345
        self.counter += 1;
2✔
346

2✔
347
        if self.txn.is_none() {
2✔
348
            let txn = self.cache.begin_rw_txn().map_err(|e| {
2✔
349
                ExecutionError::SinkError(SinkError::CacheBeginTransactionFailed(Box::new(e)))
×
350
            })?;
2✔
351
            // SAFETY:
352
            // 1. `std::mem::transmute` is only used to extend the lifetime of `txn` to `'static`.
353
            // 2. `RwTransaction` doesn't reference data in `LmdbCache`, the lifetime of it is only
354
            // to ensure that the returned `RwTransaction` does not outlive `LmdbCache`.
×
355
            // 3. `txn` in `CacheSink` is private, and we don't expose it to the outside, so the one owning
×
356
            // `txn` must own `CacheSink`.
×
357
            // 4. The declaration order in `CacheSink` ensures `txn` is dropped before `cache`.
×
358
            let txn = unsafe {
2✔
359
                std::mem::transmute::<lmdb_rs::RwTransaction<'_>, lmdb_rs::RwTransaction<'static>>(
2✔
360
                    txn,
2✔
361
                )
2✔
362
            };
2✔
363
            self.txn = Some(txn);
2✔
364
        }
×
365
        let txn = self.txn.as_mut().unwrap();
2✔
366

×
367
        let (schema, secondary_indexes) = self
2✔
368
            .input_schemas
2✔
369
            .get(&from_port)
2✔
370
            .ok_or(ExecutionError::SchemaNotInitialized)?;
2✔
371

×
372
        if let Some(notifier) = &self.notifier {
2✔
373
            let op = types_helper::map_operation(self.api_endpoint.name.to_owned(), &op);
×
374
            notifier
×
375
                .try_send(PipelineResponse {
×
376
                    endpoint: self.api_endpoint.name.to_owned(),
×
377
                    api_event: Some(ApiEvent::Op(op)),
×
378
                })
×
379
                .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
×
380
        }
2✔
381
        match op {
2✔
382
            Operation::Delete { mut old } => {
×
383
                old.schema_id = schema.identifier;
×
384
                let key = get_primary_key(&schema.primary_index, &old.values);
×
385
                self.cache
×
386
                    .delete_with_txn(txn, &key, &old, schema, secondary_indexes)
×
387
                    .map_err(|e| {
×
388
                        ExecutionError::SinkError(SinkError::CacheDeleteFailed(Box::new(e)))
×
389
                    })?;
×
390
            }
×
391
            Operation::Insert { mut new } => {
1✔
392
                new.schema_id = schema.identifier;
1✔
393
                self.cache
1✔
394
                    .insert_with_txn(txn, &new, schema, secondary_indexes)
1✔
395
                    .map_err(|e| {
1✔
396
                        ExecutionError::SinkError(SinkError::CacheInsertFailed(Box::new(e)))
×
397
                    })?;
1✔
398
            }
×
399
            Operation::Update { mut old, mut new } => {
1✔
400
                old.schema_id = schema.identifier;
1✔
401
                new.schema_id = schema.identifier;
1✔
402
                let key = get_primary_key(&schema.primary_index, &old.values);
1✔
403
                self.cache
1✔
404
                    .update_with_txn(txn, &key, &old, &new, schema, secondary_indexes)
1✔
405
                    .map_err(|e| {
1✔
406
                        ExecutionError::SinkError(SinkError::CacheUpdateFailed(Box::new(e)))
×
407
                    })?;
1✔
408
            }
×
409
        }
410

411
        Ok(())
2✔
412
    }
2✔
413
}
×
414

×
415
impl CacheSink {
×
416
    pub fn new(
1✔
417
        cache: Arc<LmdbCache>,
1✔
418
        api_endpoint: ApiEndpoint,
1✔
419
        input_schemas: HashMap<PortHandle, (Schema, Vec<IndexDefinition>)>,
1✔
420
        notifier: Option<Sender<PipelineResponse>>,
1✔
421
        multi_pb: Option<MultiProgress>,
1✔
422
    ) -> Self {
1✔
423
        let pb = attach_progress(multi_pb);
1✔
424
        Self {
1✔
425
            txn: None,
1✔
426
            cache,
1✔
427
            counter: 0,
1✔
428
            input_schemas,
1✔
429
            api_endpoint,
1✔
430
            pb,
1✔
431
            notifier,
1✔
432
        }
1✔
433
    }
1✔
434
}
435

436
#[cfg(test)]
437
mod tests {
438

439
    use crate::test_utils;
440
    use dozer_cache::cache::{index, Cache};
441

442
    use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
443
    use dozer_core::dag::node::{NodeHandle, Sink};
444
    use dozer_core::storage::lmdb_storage::LmdbEnvironmentManager;
445

446
    use dozer_types::types::{Field, IndexDefinition, Operation, Record, SchemaIdentifier};
×
447
    use std::collections::HashMap;
448
    use tempdir::TempDir;
×
449

×
450
    #[test]
1✔
451
    // This test cases covers update of records when primary key changes because of value change in primary_key
×
452
    fn update_record_when_primary_changes() {
1✔
453
        let tmp_dir = TempDir::new("example").unwrap();
1✔
454
        let env = LmdbEnvironmentManager::create(tmp_dir.path(), "test").unwrap();
1✔
455
        let txn = env.create_txn().unwrap();
1✔
456

1✔
457
        let schema = test_utils::get_schema();
1✔
458
        let secondary_indexes: Vec<IndexDefinition> = schema
1✔
459
            .fields
1✔
460
            .iter()
1✔
461
            .enumerate()
1✔
462
            .map(|(idx, _f)| IndexDefinition::SortedInverted(vec![idx]))
2✔
463
            .collect();
1✔
464

1✔
465
        let (cache, mut sink) = test_utils::init_sink(&schema, secondary_indexes.clone());
1✔
466

1✔
467
        let mut input_schemas = HashMap::new();
1✔
468
        input_schemas.insert(DEFAULT_PORT_HANDLE, schema.clone());
1✔
469
        // sink.update_schema(&input_schemas).unwrap();
1✔
470

1✔
471
        // Initialing schemas
1✔
472
        cache
1✔
473
            .insert_schema("films", &schema, &secondary_indexes)
1✔
474
            .unwrap();
1✔
475

1✔
476
        let initial_values = vec![Field::Int(1), Field::String("Film name old".to_string())];
1✔
477

1✔
478
        let updated_values = vec![
1✔
479
            Field::Int(2),
1✔
480
            Field::String("Film name updated".to_string()),
1✔
481
        ];
1✔
482

1✔
483
        let insert_operation = Operation::Insert {
1✔
484
            new: Record {
1✔
485
                schema_id: Option::from(SchemaIdentifier { id: 1, version: 1 }),
1✔
486
                values: initial_values.clone(),
1✔
487
                version: None,
1✔
488
            },
1✔
489
        };
1✔
490

1✔
491
        let update_operation = Operation::Update {
1✔
492
            old: Record {
1✔
493
                schema_id: Option::from(SchemaIdentifier { id: 1, version: 1 }),
1✔
494
                values: initial_values.clone(),
1✔
495
                version: None,
1✔
496
            },
1✔
497
            new: Record {
1✔
498
                schema_id: Option::from(SchemaIdentifier { id: 1, version: 1 }),
1✔
499
                values: updated_values.clone(),
1✔
500
                version: None,
1✔
501
            },
1✔
502
        };
1✔
503

1✔
504
        sink.process(DEFAULT_PORT_HANDLE, insert_operation, &txn, &HashMap::new())
1✔
505
            .unwrap();
1✔
506
        sink.commit(
1✔
507
            &dozer_core::dag::epoch::Epoch::new(
1✔
508
                0,
1✔
509
                [(
1✔
510
                    NodeHandle::new(Some(DEFAULT_PORT_HANDLE), "".to_string()),
1✔
511
                    (0_u64, 0_u64),
1✔
512
                )]
1✔
513
                .into_iter()
1✔
514
                .collect(),
1✔
515
            ),
1✔
516
            &txn,
1✔
517
        )
1✔
518
        .unwrap();
1✔
519

1✔
520
        let key = index::get_primary_key(&schema.primary_index, &initial_values);
1✔
521
        let record = cache.get(&key).unwrap();
1✔
522

1✔
523
        assert_eq!(initial_values, record.values);
1✔
524

×
525
        sink.process(DEFAULT_PORT_HANDLE, update_operation, &txn, &HashMap::new())
1✔
526
            .unwrap();
1✔
527
        let epoch1 = dozer_core::dag::epoch::Epoch::new(
1✔
528
            0,
1✔
529
            [(
1✔
530
                NodeHandle::new(Some(DEFAULT_PORT_HANDLE), "".to_string()),
1✔
531
                (0_u64, 1_u64),
1✔
532
            )]
1✔
533
            .into_iter()
1✔
534
            .collect(),
1✔
535
        );
1✔
536
        sink.commit(&epoch1, &txn).unwrap();
1✔
537

1✔
538
        // Primary key with old values
1✔
539
        let key = index::get_primary_key(&schema.primary_index, &initial_values);
1✔
540

1✔
541
        let record = cache.get(&key);
1✔
542

1✔
543
        assert!(record.is_err());
1✔
544

×
545
        // Primary key with updated values
×
546
        let key = index::get_primary_key(&schema.primary_index, &updated_values);
1✔
547
        let record = cache.get(&key).unwrap();
1✔
548

1✔
549
        assert_eq!(updated_values, record.values);
1✔
550
    }
1✔
551
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc