• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075746792

pending completion
4075746792

Pull #790

github

GitHub
Merge 75b5f293b into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

393 of 393 new or added lines in 12 files covered. (100.0%)

24721 of 36724 relevant lines covered (67.32%)

55788.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/snowflake/connector.rs
1
#[cfg(feature = "snowflake")]
2
use odbc::create_environment_v3;
3
use std::collections::HashMap;
4
use std::sync::Arc;
5
#[cfg(feature = "snowflake")]
6
use std::time::Duration;
7

8
#[cfg(feature = "snowflake")]
9
use crate::connectors::snowflake::connection::client::Client;
10
use crate::connectors::{Connector, ValidationResults};
11
use crate::ingestion::Ingestor;
12
use crate::{connectors::TableInfo, errors::ConnectorError};
13
use dozer_types::ingestion_types::SnowflakeConfig;
14
use dozer_types::parking_lot::RwLock;
15

16
#[cfg(feature = "snowflake")]
17
use crate::connectors::snowflake::stream_consumer::StreamConsumer;
18
#[cfg(feature = "snowflake")]
19
use crate::errors::SnowflakeError::ConnectionError;
20

21
#[cfg(feature = "snowflake")]
22
use dozer_types::log::{debug, info};
23

24
use dozer_types::types::SchemaWithChangesType;
25
use tokio::runtime::Runtime;
26
#[cfg(feature = "snowflake")]
27
use tokio::time;
28

29
#[cfg(feature = "snowflake")]
30
use crate::errors::{SnowflakeError, SnowflakeStreamError};
31

32
pub struct SnowflakeConnector {
33
    name: String,
34
    config: SnowflakeConfig,
35
    ingestor: Option<Arc<RwLock<Ingestor>>>,
36
    tables: Option<Vec<TableInfo>>,
37
}
38

39
impl SnowflakeConnector {
×
40
    pub fn new(name: String, config: SnowflakeConfig) -> Self {
×
41
        Self {
×
42
            name,
×
43
            config,
×
44
            ingestor: None,
×
45
            tables: None,
×
46
        }
×
47
    }
×
48
}
49

50
impl Connector for SnowflakeConnector {
51
    #[cfg(feature = "snowflake")]
×
52
    fn get_schemas(
×
53
        &self,
×
54
        table_names: Option<Vec<TableInfo>>,
×
55
    ) -> Result<Vec<SchemaWithChangesType>, ConnectorError> {
×
56
        let client = Client::new(&self.config);
×
57
        let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
×
58
        let conn = env
×
59
            .connect_with_connection_string(&client.get_conn_string())
×
60
            .map_err(|e| ConnectionError(Box::new(e)))?;
×
61

×
62
        let keys = client
×
63
            .fetch_keys(&conn)
×
64
            .map_err(ConnectorError::SnowflakeError)?;
×
65

×
66
        let tables_indexes = table_names.clone().map_or(HashMap::new(), |tables| {
×
67
            let mut result = HashMap::new();
×
68
            for (idx, table) in tables.iter().enumerate() {
×
69
                result.insert(table.table_name.clone(), idx);
×
70
            }
×
71

×
72
            result
×
73
        });
×
74

×
75
        client
×
76
            .fetch_tables(table_names, tables_indexes, keys, &conn)
×
77
            .map_err(ConnectorError::SnowflakeError)
×
78
    }
×
79

80
    #[cfg(not(feature = "snowflake"))]
81
    fn get_schemas(
82
        &self,
83
        _table_names: Option<Vec<TableInfo>>,
84
    ) -> Result<Vec<SchemaWithChangesType>, ConnectorError> {
85
        todo!()
86
    }
87

×
88
    fn get_tables(&self) -> Result<Vec<TableInfo>, ConnectorError> {
×
89
        todo!()
×
90
    }
×
91

×
92
    fn test_connection(&self) -> Result<(), ConnectorError> {
×
93
        todo!()
×
94
    }
×
95

×
96
    fn initialize(
×
97
        &mut self,
×
98
        ingestor: Arc<RwLock<Ingestor>>,
×
99
        tables: Option<Vec<TableInfo>>,
×
100
    ) -> Result<(), ConnectorError> {
×
101
        self.ingestor = Some(ingestor);
×
102
        self.tables = tables;
×
103
        Ok(())
×
104
    }
×
105

×
106
    fn start(&self, from_seq: Option<(u64, u64)>) -> Result<(), ConnectorError> {
×
107
        let ingestor = self
×
108
            .ingestor
×
109
            .as_ref()
×
110
            .map_or(Err(ConnectorError::InitializationError), Ok)?
×
111
            .clone();
×
112

×
113
        Runtime::new().unwrap().block_on(async {
×
114
            run(
×
115
                self.name.clone(),
×
116
                self.config.clone(),
×
117
                self.tables.clone(),
×
118
                ingestor,
×
119
                from_seq,
×
120
            )
×
121
            .await
×
122
        })
×
123
    }
×
124

×
125
    fn stop(&self) {}
×
126

×
127
    fn validate(&self, _tables: Option<Vec<TableInfo>>) -> Result<(), ConnectorError> {
×
128
        Ok(())
×
129
    }
×
130

×
131
    fn validate_schemas(&self, _tables: &[TableInfo]) -> Result<ValidationResults, ConnectorError> {
×
132
        Ok(HashMap::new())
×
133
    }
×
134
}
135

136
#[cfg(feature = "snowflake")]
×
137
async fn run(
×
138
    name: String,
×
139
    config: SnowflakeConfig,
×
140
    tables: Option<Vec<TableInfo>>,
×
141
    ingestor: Arc<RwLock<Ingestor>>,
×
142
    from_seq: Option<(u64, u64)>,
×
143
) -> Result<(), ConnectorError> {
×
144
    let client = Client::new(&config);
×
145

×
146
    // SNAPSHOT part - run it when stream table doesnt exist
×
147
    match tables {
×
148
        None => {}
×
149
        Some(tables) => {
×
150
            let stream_client = Client::new(&config);
×
151
            let ingestor_stream = Arc::clone(&ingestor);
×
152
            let mut interval = time::interval(Duration::from_secs(5));
×
153

×
154
            let mut consumer = StreamConsumer::new();
×
155
            let mut iteration = 1;
×
156
            loop {
×
157
                for (idx, table) in tables.iter().enumerate() {
×
158
                    // We only check stream status on first iteration
×
159
                    if iteration == 1 {
×
160
                        match from_seq {
×
161
                            None | Some((0, _)) => {
×
162
                                info!("[{}][{}] Creating new stream", name, table.table_name);
×
163
                                StreamConsumer::drop_stream(&client, &table.table_name)?;
×
164
                                StreamConsumer::create_stream(&client, &table.table_name)?;
×
165
                            }
166
                            Some((lsn, seq)) => {
×
167
                                info!(
168
                                    "[{}][{}] Continuing ingestion from {}/{}",
×
169
                                    name, table.table_name, lsn, seq
×
170
                                );
171
                                iteration = lsn;
×
172
                                if let Ok(false) =
×
173
                                    StreamConsumer::is_stream_created(&client, &table.table_name)
×
174
                                {
175
                                    return Err(ConnectorError::SnowflakeError(
×
176
                                        SnowflakeError::SnowflakeStreamError(
×
177
                                            SnowflakeStreamError::StreamNotFound,
×
178
                                        ),
×
179
                                    ));
×
180
                                }
×
181
                            }
182
                        }
183
                    }
×
184

185
                    debug!(
186
                        "[{}][{}] Reading from changes stream",
×
187
                        name, table.table_name
×
188
                    );
×
189

×
190
                    consumer.consume_stream(
×
191
                        &stream_client,
×
192
                        &table.table_name,
×
193
                        &ingestor_stream,
×
194
                        idx,
×
195
                        iteration,
×
196
                    )?;
×
197

198
                    interval.tick().await;
×
199
                }
200

×
201
                iteration += 1;
×
202
            }
203
        }
204
    };
205

206
    Ok(())
×
207
}
×
208

209
#[cfg(not(feature = "snowflake"))]
210
async fn run(
211
    _name: String,
212
    _config: SnowflakeConfig,
213
    _tables: Option<Vec<TableInfo>>,
214
    _ingestor: Arc<RwLock<Ingestor>>,
215
    _from_seq: Option<(u64, u64)>,
216
) -> Result<(), ConnectorError> {
217
    Ok(())
218
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc