• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/snowflake/stream_consumer.rs
1
use crate::connectors::snowflake::connection::client::Client;
2

3
use crate::errors::{ConnectorError, SnowflakeError};
4
use crate::ingestion::Ingestor;
5
use dozer_types::ingestion_types::IngestionMessage;
6

7
use dozer_types::parking_lot::RwLock;
8

9
use crate::errors::SnowflakeStreamError::{CannotDetermineAction, UnsupportedActionInStream};
10
use dozer_types::types::{Field, Operation, OperationEvent, Record, SchemaIdentifier};
11
use odbc::create_environment_v3;
12
use std::sync::Arc;
13

14
#[derive(Default)]
×
15
pub struct StreamConsumer {}
16

17
impl StreamConsumer {
18
    pub fn new() -> Self {
×
19
        Self {}
×
20
    }
×
21

22
    pub fn get_stream_table_name(table_name: &str) -> String {
×
23
        format!("dozer_{table_name}_stream")
×
24
    }
×
25

26
    pub fn get_stream_temp_table_name(table_name: &str) -> String {
×
27
        format!("dozer_{table_name}_stream_temp")
×
28
    }
×
29

30
    pub fn is_stream_created(client: &Client, table_name: &str) -> Result<bool, ConnectorError> {
×
31
        let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
×
32
        let conn = env
×
33
            .connect_with_connection_string(&client.get_conn_string())
×
34
            .unwrap();
×
35

×
36
        client
×
37
            .stream_exist(&conn, &Self::get_stream_table_name(table_name))
×
38
            .map_err(ConnectorError::SnowflakeError)
×
39
    }
×
40

41
    pub fn drop_stream(client: &Client, table_name: &str) -> Result<Option<bool>, SnowflakeError> {
×
42
        let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
×
43
        let conn = env
×
44
            .connect_with_connection_string(&client.get_conn_string())
×
45
            .unwrap();
×
46

×
47
        let query = format!(
×
48
            "DROP STREAM IF EXISTS {}",
×
49
            Self::get_stream_table_name(table_name),
×
50
        );
×
51

×
52
        client.exec(&conn, query)
×
53
    }
×
54

55
    pub fn create_stream(client: &Client, table_name: &String) -> Result<(), ConnectorError> {
×
56
        let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
×
57
        let conn = env
×
58
            .connect_with_connection_string(&client.get_conn_string())
×
59
            .unwrap();
×
60

×
61
        let query = format!(
×
62
            "CREATE STREAM {} on table {} SHOW_INITIAL_ROWS = TRUE",
×
63
            Self::get_stream_table_name(table_name),
×
64
            table_name,
×
65
        );
×
66

67
        let result = client.exec_stream_creation(&conn, query)?;
×
68

×
69
        if !result {
×
70
            let query = format!(
×
71
                "CREATE STREAM {} on view {} SHOW_INITIAL_ROWS = TRUE",
×
72
                Self::get_stream_table_name(table_name),
×
73
                table_name
×
74
            );
×
75
            client.exec(&conn, query)?;
×
76
        }
×
77

78
        Ok(())
×
79
    }
×
80

81
    fn map_record(row: Vec<Field>, table_idx: usize) -> Record {
×
82
        Record {
×
83
            schema_id: Some(SchemaIdentifier {
×
84
                id: table_idx as u32,
×
85
                version: 1,
×
86
            }),
×
87
            values: row,
×
88
            version: None,
×
89
        }
×
90
    }
×
91

×
92
    fn get_ingestion_message(
×
93
        row: Vec<Field>,
×
94
        action_idx: usize,
×
95
        used_columns_for_schema: usize,
×
96
        table_idx: usize,
×
97
    ) -> Result<IngestionMessage, ConnectorError> {
×
98
        if let Field::String(action) = row.get(action_idx).unwrap() {
×
99
            let mut row_mut = row.clone();
×
100
            let insert_action = &"INSERT";
×
101
            let delete_action = &"DELETE";
×
102

×
103
            row_mut.truncate(used_columns_for_schema);
×
104

×
105
            if insert_action == action {
×
106
                Ok(IngestionMessage::OperationEvent(OperationEvent {
×
107
                    seq_no: 0,
×
108
                    operation: Operation::Insert {
×
109
                        new: Self::map_record(row_mut, table_idx),
×
110
                    },
×
111
                }))
×
112
            } else if delete_action == action {
×
113
                Ok(IngestionMessage::OperationEvent(OperationEvent {
×
114
                    seq_no: 0,
×
115
                    operation: Operation::Delete {
×
116
                        old: Self::map_record(row_mut, table_idx),
×
117
                    },
×
118
                }))
×
119
            } else {
×
120
                Err(ConnectorError::SnowflakeError(
×
121
                    SnowflakeError::SnowflakeStreamError(UnsupportedActionInStream(action.clone())),
×
122
                ))
×
123
            }
×
124
        } else {
×
125
            Err(ConnectorError::SnowflakeError(
×
126
                SnowflakeError::SnowflakeStreamError(CannotDetermineAction),
×
127
            ))
×
128
        }
×
129
    }
×
130

×
131
    pub fn consume_stream(
×
132
        &mut self,
×
133
        client: &Client,
×
134
        table_name: &str,
×
135
        ingestor: &Arc<RwLock<Ingestor>>,
×
136
        table_idx: usize,
×
137
        iteration: u64,
×
138
    ) -> Result<(), ConnectorError> {
×
139
        let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
×
140
        let conn = env
×
141
            .connect_with_connection_string(&client.get_conn_string())
×
142
            .unwrap();
×
143

×
144
        let temp_table_name = Self::get_stream_temp_table_name(table_name);
×
145
        let stream_name = Self::get_stream_table_name(table_name);
×
146
        let temp_table_exist = client.table_exist(&conn, &temp_table_name)?;
×
147

×
148
        if !temp_table_exist {
×
149
            let query = format!(
×
150
                "CREATE OR REPLACE TEMP TABLE {temp_table_name} AS
×
151
                    SELECT * FROM {stream_name} ORDER BY METADATA$ACTION;"
×
152
            );
×
153

×
154
            client.exec(&conn, query)?;
×
155
        }
×
156

×
157
        let result = client.fetch(&conn, format!("SELECT * FROM {temp_table_name};"))?;
×
158
        if let Some((schema, iterator)) = result {
×
159
            let mut truncated_schema = schema.clone();
×
160
            truncated_schema.truncate(schema.len() - 3);
×
161

×
162
            let columns_length = schema.len();
×
163
            let used_columns_for_schema = columns_length - 3;
×
164
            let action_idx = used_columns_for_schema;
×
165

166
            for (idx, row) in iterator.enumerate() {
×
167
                let ingestion_message = Self::get_ingestion_message(
×
168
                    row,
×
169
                    action_idx,
×
170
                    used_columns_for_schema,
×
171
                    table_idx,
×
172
                )?;
×
173
                ingestor
×
174
                    .write()
×
175
                    .handle_message(((iteration, idx as u64), ingestion_message))
×
176
                    .map_err(ConnectorError::IngestorError)?;
×
177
            }
178
        }
×
179

180
        let query = format!("DROP TABLE {temp_table_name};");
×
181

×
182
        client
×
183
            .exec(&conn, query)
×
184
            .map_err(ConnectorError::SnowflakeError)
×
185
            .map(|_| ())
×
186
    }
×
187
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc