• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4023928230

pending completion
4023928230

Pull #744

github

GitHub
Merge fe141bf50 into 70bd6e0ad
Pull Request #744: feat: Implement direct insert to cache pipeline

243 of 243 new or added lines in 16 files covered. (100.0%)

23197 of 34900 relevant lines covered (66.47%)

45617.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs
1
use crate::connectors::snowflake::connection::client::Client;
2

3
use crate::errors::{ConnectorError, SnowflakeError};
4
use crate::ingestion::Ingestor;
5
use dozer_types::ingestion_types::IngestionMessage;
6

7
use dozer_types::parking_lot::RwLock;
8

9
use crate::connectors::snowflake::snapshotter::Snapshotter;
10
use crate::errors::SnowflakeStreamError::{CannotDetermineAction, UnsupportedActionInStream};
11
use dozer_types::types::{Field, Operation, OperationEvent, Record, SchemaIdentifier};
12
use odbc::create_environment_v3;
13
use std::sync::Arc;
14

15
#[derive(Default)]
×
16
pub struct StreamConsumer {}
17

18
impl StreamConsumer {
19
    pub fn new() -> Self {
×
20
        Self {}
×
21
    }
×
22

23
    pub fn get_stream_table_name(table_name: &str) -> String {
×
24
        format!("dozer_{table_name}_stream")
×
25
    }
×
26

27
    pub fn get_stream_temp_table_name(table_name: &str) -> String {
×
28
        format!("dozer_{table_name}_stream_temp")
×
29
    }
×
30

31
    pub fn is_stream_created(client: &Client, table_name: String) -> Result<bool, ConnectorError> {
×
32
        let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
×
33
        let conn = env
×
34
            .connect_with_connection_string(&client.get_conn_string())
×
35
            .unwrap();
×
36

×
37
        client
×
38
            .stream_exist(&conn, &Self::get_stream_table_name(&table_name))
×
39
            .map_err(ConnectorError::SnowflakeError)
×
40
    }
×
41

42
    pub fn create_stream(client: &Client, table_name: &String) -> Result<(), ConnectorError> {
×
43
        let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
×
44
        let conn = env
×
45
            .connect_with_connection_string(&client.get_conn_string())
×
46
            .unwrap();
×
47

×
48
        let query = format!(
×
49
            "CREATE STREAM {} on table {} at(stream => '{}')",
×
50
            Self::get_stream_table_name(table_name),
×
51
            table_name,
×
52
            Snapshotter::get_snapshot_table_name(table_name)
×
53
        );
×
54

×
55
        client
×
56
            .exec(&conn, query)
×
57
            .map(|_| ())
×
58
            .map_err(ConnectorError::SnowflakeError)
×
59
    }
×
60

61
    fn map_record(row: Vec<Field>, table_idx: usize) -> Record {
×
62
        Record {
×
63
            schema_id: Some(SchemaIdentifier {
×
64
                id: table_idx as u32,
×
65
                version: 1,
×
66
            }),
×
67
            values: row,
×
68
            version: None,
×
69
        }
×
70
    }
×
71

×
72
    fn get_ingestion_message(
×
73
        row: Vec<Field>,
×
74
        action_idx: usize,
×
75
        used_columns_for_schema: usize,
×
76
        table_idx: usize,
×
77
    ) -> Result<IngestionMessage, ConnectorError> {
78
        if let Field::String(action) = row.get(action_idx).unwrap() {
×
79
            let mut row_mut = row.clone();
×
80
            let insert_action = &"INSERT";
×
81
            let delete_action = &"DELETE";
×
82

×
83
            if insert_action == action {
×
84
                row_mut.truncate(used_columns_for_schema);
×
85
                Ok(IngestionMessage::OperationEvent(OperationEvent {
×
86
                    seq_no: 0,
×
87
                    operation: Operation::Insert {
×
88
                        new: Self::map_record(row, table_idx),
×
89
                    },
×
90
                }))
×
91
            } else if delete_action == action {
×
92
                row_mut.truncate(used_columns_for_schema);
×
93

×
94
                Ok(IngestionMessage::OperationEvent(OperationEvent {
×
95
                    seq_no: 0,
×
96
                    operation: Operation::Delete {
×
97
                        old: Self::map_record(row, table_idx),
×
98
                    },
×
99
                }))
×
100
            } else {
×
101
                Err(ConnectorError::SnowflakeError(
×
102
                    SnowflakeError::SnowflakeStreamError(UnsupportedActionInStream(action.clone())),
×
103
                ))
×
104
            }
×
105
        } else {
×
106
            Err(ConnectorError::SnowflakeError(
×
107
                SnowflakeError::SnowflakeStreamError(CannotDetermineAction),
×
108
            ))
×
109
        }
×
110
    }
×
111

×
112
    pub fn consume_stream(
×
113
        &mut self,
×
114
        client: &Client,
×
115
        table_name: &str,
×
116
        ingestor: &Arc<RwLock<Ingestor>>,
×
117
        table_idx: usize,
×
118
    ) -> Result<(), ConnectorError> {
×
119
        let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
×
120
        let conn = env
×
121
            .connect_with_connection_string(&client.get_conn_string())
×
122
            .unwrap();
×
123

×
124
        let temp_table_name = Self::get_stream_temp_table_name(table_name);
×
125
        let stream_name = Self::get_stream_table_name(table_name);
×
126
        let temp_table_exist = client.table_exist(&conn, &temp_table_name)?;
×
127

×
128
        if !temp_table_exist {
×
129
            let query = format!(
×
130
                "CREATE OR REPLACE TEMP TABLE {temp_table_name} AS
×
131
                    SELECT * FROM {stream_name} ORDER BY METADATA$ACTION;"
×
132
            );
×
133

×
134
            client.exec(&conn, query)?;
×
135
        }
×
136

×
137
        let result = client.fetch(&conn, format!("SELECT * FROM {temp_table_name};"))?;
×
138
        if let Some((schema, iterator)) = result {
×
139
            let mut truncated_schema = schema.clone();
×
140
            truncated_schema.truncate(schema.len() - 3);
×
141

×
142
            let columns_length = schema.len();
×
143
            let used_columns_for_schema = columns_length - 3;
×
144
            let action_idx = used_columns_for_schema;
×
145

×
146
            for (idx, row) in iterator.enumerate() {
×
147
                let ingestion_message = Self::get_ingestion_message(
×
148
                    row,
×
149
                    action_idx,
×
150
                    used_columns_for_schema,
×
151
                    table_idx,
×
152
                )?;
×
153
                ingestor
×
154
                    .write()
×
155
                    .handle_message(((1, idx as u64), ingestion_message))
×
156
                    .map_err(ConnectorError::IngestorError)?;
×
157
            }
×
158
        }
×
159

×
160
        let query = format!("DROP TABLE {temp_table_name};");
×
161

×
162
        client
×
163
            .exec(&conn, query)
×
164
            .map_err(ConnectorError::SnowflakeError)
×
165
            .map(|_| ())
×
166
    }
×
167
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc