• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4023928230

pending completion
4023928230

Pull #744

github

GitHub
Merge fe141bf50 into 70bd6e0ad
Pull Request #744: feat: Implement direct insert to cache pipeline

243 of 243 new or added lines in 16 files covered. (100.0%)

23197 of 34900 relevant lines covered (66.47%)

45617.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/simple/tests.rs
1
use std::{
2
    fs,
3
    sync::{
4
        atomic::{AtomicBool, Ordering},
5
        Arc,
6
    },
7
    thread,
8
    time::Duration,
9
};
10

11
use dozer_api::CacheEndpoint;
12
use dozer_cache::cache::{expression::QueryExpression, test_utils, Cache, CacheOptions, LmdbCache};
13
use dozer_ingestion::ingestion::{IngestionConfig, Ingestor};
14
use dozer_types::{
15
    ingestion_types::IngestionMessage,
16
    log::warn,
17
    models::{
18
        self,
19
        api_endpoint::{ApiEndpoint, ApiIndex},
20
        connection::EventsAuthentication,
21
        flags::Flags,
22
    },
23
    types::{Field, OperationEvent, Record, Schema},
24
};
25
use serde_json::{json, Value};
26
use tempdir::TempDir;
27

28
use crate::pipeline::CacheSinkSettings;
29

30
use super::executor::Executor;
31

32
fn single_source_sink_impl(schema: Schema) {
×
33
    let source = models::source::Source {
×
34
        id: Some("1".to_string()),
×
35
        name: "events".to_string(),
×
36
        table_name: "events".to_string(),
×
37
        columns: vec![],
×
38
        connection: Some(models::connection::Connection {
×
39
            authentication: Some(models::connection::Authentication::Events(
×
40
                EventsAuthentication::default(),
×
41
            )),
×
42
            id: Some("1".to_string()),
×
43
            db_type: models::connection::DBType::Events as i32,
×
44
            name: "events".to_string(),
×
45
            ..Default::default()
×
46
        }),
×
47
        refresh_config: Some(models::source::RefreshConfig::default()),
×
48
        ..Default::default()
×
49
    };
×
50

×
51
    let table_name = "events";
×
52
    let cache = Arc::new(LmdbCache::new(CacheOptions::default()).unwrap());
×
53
    let cache_endpoint = CacheEndpoint {
×
54
        cache: cache.clone(),
×
55
        endpoint: ApiEndpoint {
×
56
            id: Some("1".to_string()),
×
57
            name: table_name.to_string(),
×
58
            path: "/events".to_string(),
×
59
            sql: Some("select a, b from events group by a,b;".to_string()),
×
60
            index: Some(ApiIndex {
×
61
                primary_key: vec!["a".to_string()],
×
62
            }),
×
63
            ..Default::default()
×
64
        },
×
65
    };
×
66

×
67
    let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
×
68

×
69
    let ingestor2 = ingestor.clone();
×
70
    let running = Arc::new(AtomicBool::new(true));
×
71
    let r = running.clone();
×
72
    let executor_running = running;
×
73

×
74
    let items: Vec<(i64, String, i64)> = vec![
×
75
        (1, "yuri".to_string(), 521),
×
76
        (2, "mega".to_string(), 521),
×
77
        (3, "james".to_string(), 523),
×
78
        (4, "james".to_string(), 524),
×
79
        (5, "steff".to_string(), 526),
×
80
        (6, "mega".to_string(), 527),
×
81
        (7, "james".to_string(), 528),
×
82
    ];
×
83

×
84
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
85
    if tmp_dir.path().exists() {
×
86
        fs::remove_dir_all(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to remove old dir"));
×
87
    }
×
88
    fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
89

×
90
    let tmp_path = tmp_dir.path().to_owned();
×
91
    let _thread = thread::spawn(move || {
×
92
        let executor = Executor::new(
×
93
            vec![source],
×
94
            vec![cache_endpoint],
×
95
            ingestor,
×
96
            iterator,
×
97
            executor_running,
×
98
            tmp_path,
×
99
        );
×
100
        let flags = Flags::default();
×
101
        match executor.run(None, CacheSinkSettings::new(Some(flags), None)) {
×
102
            Ok(_) => {}
×
103
            Err(e) => warn!("Exiting: {:?}", e),
×
104
        }
105
    });
×
106

107
    // Insert each record and query cache
108
    for (a, b, c) in items {
×
109
        let record = Record::new(
×
110
            schema.identifier,
×
111
            vec![Field::Int(a), Field::String(b), Field::Int(c)],
×
112
            None,
×
113
        );
×
114
        ingestor2
×
115
            .write()
×
116
            .handle_message((
×
117
                (1, 0),
×
118
                IngestionMessage::OperationEvent(OperationEvent {
×
119
                    seq_no: a as u64,
×
120
                    operation: dozer_types::types::Operation::Insert { new: record },
×
121
                }),
×
122
            ))
×
123
            .unwrap();
×
124
    }
×
125

126
    // Allow for the thread to process the records
127
    thread::sleep(Duration::from_millis(3000));
×
128
    // Shutdown the thread
×
129
    r.store(false, Ordering::SeqCst);
×
130

×
131
    test_query("events".to_string(), json!({}), 7, &cache);
×
132
}
×
133

134
#[test]
×
135
#[ignore]
136
fn single_source_sink() {
×
137
    let mut schema = test_utils::schema_1().0;
×
138
    single_source_sink_impl(schema.clone());
×
139
    schema.primary_index.clear();
×
140
    single_source_sink_impl(schema);
×
141
}
×
142

143
fn test_query(schema_name: String, query: Value, count: usize, cache: &LmdbCache) {
×
144
    let query = serde_json::from_value::<QueryExpression>(query).unwrap();
×
145
    let records = cache.query(&schema_name, &query).unwrap();
×
146

×
147
    assert_eq!(records.len(), count, "Count must be equal : {query:?}");
×
148
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc