• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4023928230

pending completion
4023928230

Pull #744

github

GitHub
Merge fe141bf50 into 70bd6e0ad
Pull Request #744: feat: Implement direct insert to cache pipeline

243 of 243 new or added lines in 16 files covered. (100.0%)

23197 of 34900 relevant lines covered (66.47%)

45617.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/snowflake/snapshotter.rs
1
use crate::connectors::snowflake::connection::client::Client;
2
use crate::errors::ConnectorError;
3
use crate::ingestion::Ingestor;
4
use dozer_types::ingestion_types::IngestionMessage;
5
use dozer_types::parking_lot::RwLock;
6
use dozer_types::types::{Operation, OperationEvent, Record, SchemaIdentifier};
7

8
use crate::errors::SnowflakeError::ConnectionError;
9
use odbc::create_environment_v3;
10
use std::sync::Arc;
11

12
pub struct Snapshotter {}
13

14
impl Snapshotter {
15
    pub fn get_snapshot_table_name(table_name: &String) -> String {
×
16
        format!("dozer_{table_name}_snapshot")
×
17
    }
×
18

19
    pub fn run(
×
20
        client: &Client,
×
21
        ingestor: &Arc<RwLock<Ingestor>>,
×
22
        table_name: String,
×
23
        table_idx: usize,
×
24
        offset: usize,
×
25
    ) -> Result<(), ConnectorError> {
×
26
        let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
×
27
        let conn = env
×
28
            .connect_with_connection_string(&client.get_conn_string())
×
29
            .map_err(|e| ConnectionError(Box::new(e)))?;
×
30

×
31
        let snapshot_table = Snapshotter::get_snapshot_table_name(&table_name);
×
32
        let query = format!(
×
33
            "CREATE STREAM IF NOT EXISTS {snapshot_table} ON TABLE {table_name} SHOW_INITIAL_ROWS = TRUE;"
×
34
        );
×
35
        client.exec(&conn, query)?;
×
36

37
        let mut idx = offset;
×
38
        let result = client.fetch(&conn, format!("SELECT * EXCLUDE (\"METADATA$ACTION\", \"METADATA$ISUPDATE\", \"METADATA$ROW_ID\") FROM {snapshot_table};"))?;
×
39
        if let Some((_, mut iterator)) = result {
×
40
            for values in iterator.by_ref() {
×
41
                ingestor
×
42
                    .write()
×
43
                    .handle_message((
×
44
                        (0, idx as u64),
×
45
                        IngestionMessage::OperationEvent(OperationEvent {
×
46
                            seq_no: 0,
×
47
                            operation: Operation::Insert {
×
48
                                new: Record {
×
49
                                    schema_id: Some(SchemaIdentifier {
×
50
                                        id: table_idx as u32,
×
51
                                        version: 1,
×
52
                                    }),
×
53
                                    values,
×
54
                                    version: None,
×
55
                                },
×
56
                            },
×
57
                        }),
×
58
                    ))
×
59
                    .map_err(ConnectorError::IngestorError)?;
×
60

×
61
                idx += 1;
×
62
            }
×
63
            iterator.close_cursor()?;
×
64
        }
×
65

×
66
        Ok(())
×
67
    }
×
68
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc