• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4023928230

pending completion
4023928230

Pull #744

github

GitHub
Merge fe141bf50 into 70bd6e0ad
Pull Request #744: feat: Implement direct insert to cache pipeline

243 of 243 new or added lines in 16 files covered. (100.0%)

23197 of 34900 relevant lines covered (66.47%)

45617.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/snowflake/connector.rs
1
#[cfg(feature = "snowflake")]
2
use odbc::create_environment_v3;
3
use std::collections::HashMap;
4
use std::sync::Arc;
5
#[cfg(feature = "snowflake")]
6
use std::time::Duration;
7

8
#[cfg(feature = "snowflake")]
9
use crate::connectors::snowflake::connection::client::Client;
10
use crate::connectors::{Connector, ValidationResults};
11
use crate::ingestion::Ingestor;
12
use crate::{connectors::TableInfo, errors::ConnectorError};
13
use dozer_types::ingestion_types::SnowflakeConfig;
14
use dozer_types::parking_lot::RwLock;
15

16
#[cfg(feature = "snowflake")]
17
use crate::connectors::snowflake::snapshotter::Snapshotter;
18
#[cfg(feature = "snowflake")]
19
use crate::connectors::snowflake::stream_consumer::StreamConsumer;
20
#[cfg(feature = "snowflake")]
21
use crate::errors::SnowflakeError::ConnectionError;
22

23
use dozer_types::types::SchemaWithChangesType;
24
use tokio::runtime::Runtime;
25
#[cfg(feature = "snowflake")]
26
use tokio::time;
27

28
pub struct SnowflakeConnector {
29
    pub id: u64,
30
    config: SnowflakeConfig,
31
    ingestor: Option<Arc<RwLock<Ingestor>>>,
32
    tables: Option<Vec<TableInfo>>,
33
}
34

35
impl SnowflakeConnector {
36
    pub fn new(id: u64, config: SnowflakeConfig) -> Self {
×
37
        Self {
×
38
            id,
×
39
            config,
×
40
            ingestor: None,
×
41
            tables: None,
×
42
        }
×
43
    }
×
44
}
45

46
impl Connector for SnowflakeConnector {
47
    #[cfg(feature = "snowflake")]
48
    fn get_schemas(
×
49
        &self,
×
50
        table_names: Option<Vec<TableInfo>>,
×
51
    ) -> Result<Vec<SchemaWithChangesType>, ConnectorError> {
×
52
        let client = Client::new(&self.config);
×
53
        let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
×
54
        let conn = env
×
55
            .connect_with_connection_string(&client.get_conn_string())
×
56
            .map_err(|e| ConnectionError(Box::new(e)))?;
×
57

58
        let keys = client
×
59
            .fetch_keys(&conn)
×
60
            .map_err(ConnectorError::SnowflakeError)?;
×
61

62
        let tables_indexes = table_names.clone().map_or(HashMap::new(), |tables| {
×
63
            let mut result = HashMap::new();
×
64
            for (idx, table) in tables.iter().enumerate() {
×
65
                result.insert(table.table_name.clone(), idx);
×
66
            }
×
67

68
            result
×
69
        });
×
70

×
71
        client
×
72
            .fetch_tables(table_names, tables_indexes, keys, &conn)
×
73
            .map_err(ConnectorError::SnowflakeError)
×
74
    }
×
75

76
    #[cfg(not(feature = "snowflake"))]
77
    fn get_schemas(
78
        &self,
79
        _table_names: Option<Vec<TableInfo>>,
80
    ) -> Result<Vec<SchemaWithChangesType>, ConnectorError> {
81
        todo!()
82
    }
83

84
    fn get_tables(&self) -> Result<Vec<TableInfo>, ConnectorError> {
×
85
        todo!()
×
86
    }
×
87

88
    fn test_connection(&self) -> Result<(), ConnectorError> {
×
89
        todo!()
×
90
    }
×
91

92
    fn initialize(
×
93
        &mut self,
×
94
        ingestor: Arc<RwLock<Ingestor>>,
×
95
        tables: Option<Vec<TableInfo>>,
×
96
    ) -> Result<(), ConnectorError> {
×
97
        self.ingestor = Some(ingestor);
×
98
        self.tables = tables;
×
99
        Ok(())
×
100
    }
×
101

102
    fn start(&self, from_seq: Option<(u64, u64)>) -> Result<(), ConnectorError> {
×
103
        let _connector_id = self.id;
×
104
        let ingestor = self
×
105
            .ingestor
×
106
            .as_ref()
×
107
            .map_or(Err(ConnectorError::InitializationError), Ok)?
×
108
            .clone();
×
109

×
110
        Runtime::new().unwrap().block_on(async {
×
111
            run(self.config.clone(), self.tables.clone(), ingestor, from_seq).await
×
112
        })
×
113
    }
×
114

115
    fn stop(&self) {}
×
116

117
    fn validate(&self, _tables: Option<Vec<TableInfo>>) -> Result<(), ConnectorError> {
×
118
        Ok(())
×
119
    }
×
120

121
    fn validate_schemas(&self, _tables: &[TableInfo]) -> Result<ValidationResults, ConnectorError> {
×
122
        Ok(HashMap::new())
×
123
    }
×
124
}
125

126
#[cfg(feature = "snowflake")]
127
async fn run(
×
128
    config: SnowflakeConfig,
×
129
    tables: Option<Vec<TableInfo>>,
×
130
    ingestor: Arc<RwLock<Ingestor>>,
×
131
    from_seq: Option<(u64, u64)>,
×
132
) -> Result<(), ConnectorError> {
×
133
    let client = Client::new(&config);
×
134

×
135
    // SNAPSHOT part - run it when stream table doesnt exist
×
136
    match tables {
×
137
        None => {}
×
138
        Some(tables) => {
×
139
            for (idx, table) in tables.iter().enumerate() {
×
140
                let is_stream_created =
×
141
                    StreamConsumer::is_stream_created(&client, table.table_name.clone())?;
×
142
                if !is_stream_created {
×
143
                    let ingestor_snapshot = Arc::clone(&ingestor);
×
144
                    Snapshotter::run(
×
145
                        &client,
×
146
                        &ingestor_snapshot,
×
147
                        table.table_name.clone(),
×
148
                        idx,
×
149
                        from_seq.map_or(0, |(_, offset)| offset as usize),
×
150
                    )?;
×
151
                    StreamConsumer::create_stream(&client, &table.table_name)?;
×
152
                }
×
153
            }
154

×
155
            let stream_client = Client::new(&config);
×
156
            let ingestor_stream = Arc::clone(&ingestor);
×
157
            let mut interval = time::interval(Duration::from_secs(5));
×
158

×
159
            let mut consumer = StreamConsumer::new();
×
160
            loop {
×
161
                for (idx, table) in tables.iter().enumerate() {
×
162
                    consumer.consume_stream(
×
163
                        &stream_client,
×
164
                        &table.table_name,
×
165
                        &ingestor_stream,
×
166
                        idx,
×
167
                    )?;
×
168

×
169
                    interval.tick().await;
×
170
                }
171
            }
172
        }
173
    };
174

175
    Ok(())
×
176
}
×
177

178
#[cfg(not(feature = "snowflake"))]
179
async fn run(
180
    _config: SnowflakeConfig,
181
    _tables: Option<Vec<TableInfo>>,
182
    _ingestor: Arc<RwLock<Ingestor>>,
183
) -> Result<(), ConnectorError> {
184
    Ok(())
185
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc