• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4023928230

pending completion
4023928230

Pull #744

github

GitHub
Merge fe141bf50 into 70bd6e0ad
Pull Request #744: feat: Implement direct insert to cache pipeline

243 of 243 new or added lines in 16 files covered. (100.0%)

23197 of 34900 relevant lines covered (66.47%)

45617.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/snowflake/connection/client.rs
1
use dozer_types::ingestion_types::SnowflakeConfig;
2
use dozer_types::log::debug;
3

4
use crate::errors::{ConnectorError, SnowflakeError, SnowflakeSchemaError};
5

6
use crate::connectors::snowflake::schema_helper::SchemaHelper;
7
use crate::connectors::TableInfo;
8
use crate::errors::SnowflakeError::QueryError;
9
use crate::errors::SnowflakeSchemaError::DecimalConvertError;
10
use crate::errors::SnowflakeSchemaError::SchemaConversionError;
11
use dozer_types::chrono::{NaiveDate, NaiveDateTime, NaiveTime};
12
use dozer_types::rust_decimal::Decimal;
13
use dozer_types::types::*;
14
use odbc::ffi::{SqlDataType, SQL_DATE_STRUCT, SQL_TIMESTAMP_STRUCT};
15
use odbc::odbc_safe::AutocommitOn;
16
use odbc::{
17
    ColumnDescriptor, Connection, Cursor, Data, DiagnosticRecord, Executed, HasResult, NoData,
18
    ResultSetState, Statement,
19
};
20
use std::collections::HashMap;
21
use std::fmt::Write;
22

23
fn convert_decimal(bytes: &[u8], scale: u16) -> Result<Field, SnowflakeSchemaError> {
×
24
    let is_negative = bytes[bytes.len() - 4] == 255;
×
25
    let mut multiplier: i64 = 1;
×
26
    let mut result: i64 = 0;
×
27
    let bytes: &[u8] = &bytes[4..11];
×
28
    bytes.iter().for_each(|w| {
×
29
        let number = *w as i64;
×
30
        result += number * multiplier;
×
31
        multiplier *= 256;
×
32
    });
×
33

×
34
    if is_negative {
×
35
        result = -result;
×
36
    }
×
37

38
    Ok(Field::from(
39
        Decimal::try_new(result, scale as u32).map_err(DecimalConvertError)?,
×
40
    ))
41
}
×
42

43
pub fn convert_data(
×
44
    cursor: &mut Cursor<Executed, AutocommitOn>,
×
45
    i: u16,
×
46
    column_descriptor: &ColumnDescriptor,
×
47
) -> Result<Field, SnowflakeSchemaError> {
×
48
    match column_descriptor.data_type {
×
49
        SqlDataType::SQL_CHAR | SqlDataType::SQL_VARCHAR => {
50
            match cursor
×
51
                .get_data::<String>(i)
×
52
                .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
53
            {
54
                None => Ok(Field::Null),
×
55
                Some(value) => Ok(Field::from(value)),
×
56
            }
57
        }
58
        SqlDataType::SQL_DECIMAL
59
        | SqlDataType::SQL_NUMERIC
60
        | SqlDataType::SQL_INTEGER
61
        | SqlDataType::SQL_SMALLINT => match column_descriptor.decimal_digits {
×
62
            None => {
63
                match cursor
×
64
                    .get_data::<i64>(i)
×
65
                    .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
66
                {
67
                    None => Ok(Field::Null),
×
68
                    Some(value) => Ok(Field::from(value)),
×
69
                }
70
            }
71
            Some(digits) => {
×
72
                match cursor
×
73
                    .get_data::<&[u8]>(i)
×
74
                    .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
75
                {
76
                    None => Ok(Field::Null),
×
77
                    Some(value) => convert_decimal(value, digits),
×
78
                }
79
            }
80
        },
81
        SqlDataType::SQL_FLOAT | SqlDataType::SQL_REAL | SqlDataType::SQL_DOUBLE => {
82
            match cursor
×
83
                .get_data::<f64>(i)
×
84
                .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
85
            {
86
                None => Ok(Field::Null),
×
87
                Some(value) => Ok(Field::from(value)),
×
88
            }
89
        }
90
        SqlDataType::SQL_TIMESTAMP => {
91
            match cursor
×
92
                .get_data::<SQL_TIMESTAMP_STRUCT>(i)
×
93
                .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
94
            {
95
                None => Ok(Field::Null),
×
96
                Some(value) => {
×
97
                    let date = NaiveDate::from_ymd(
×
98
                        value.year as i32,
×
99
                        value.month as u32,
×
100
                        value.day as u32,
×
101
                    );
×
102
                    let time = NaiveTime::from_hms_nano(
×
103
                        value.hour as u32,
×
104
                        value.minute as u32,
×
105
                        value.second as u32,
×
106
                        value.fraction,
×
107
                    );
×
108
                    Ok(Field::from(NaiveDateTime::new(date, time)))
×
109
                }
110
            }
111
        }
112
        SqlDataType::SQL_DATE => {
113
            match cursor
×
114
                .get_data::<SQL_DATE_STRUCT>(i)
×
115
                .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
116
            {
117
                None => Ok(Field::Null),
×
118
                Some(value) => {
×
119
                    let date = NaiveDate::from_ymd(
×
120
                        value.year as i32,
×
121
                        value.month as u32,
×
122
                        value.day as u32,
×
123
                    );
×
124
                    Ok(Field::from(date))
×
125
                }
126
            }
×
127
        }
128
        SqlDataType::SQL_EXT_BIT => {
129
            match cursor
×
130
                .get_data::<bool>(i)
×
131
                .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
132
            {
133
                None => Ok(Field::Null),
×
134
                Some(v) => Ok(Field::from(v)),
×
135
            }
136
        }
137
        _ => Err(SnowflakeSchemaError::ColumnTypeNotSupported(format!(
×
138
            "{:?}",
×
139
            &column_descriptor.data_type
×
140
        ))),
×
141
    }
×
142
}
×
143

×
144
pub struct ResultIterator<'a, 'b> {
×
145
    stmt: Option<Statement<'a, 'b, Executed, HasResult, AutocommitOn>>,
×
146
    cols: i16,
147
    schema: Vec<ColumnDescriptor>,
148
}
×
149

150
impl<'a, 'b> ResultIterator<'a, 'b> {
151
    pub fn close_cursor(&mut self) -> Result<(), SnowflakeError> {
×
152
        self.stmt
×
153
            .take()
×
154
            .unwrap()
×
155
            .close_cursor()
×
156
            .map_or_else(|e| Err(QueryError(Box::new(e))), |_| Ok(()))
×
157
    }
×
158
}
×
159

×
160
impl Iterator for ResultIterator<'_, '_> {
×
161
    type Item = Vec<Field>;
×
162

×
163
    fn next(&mut self) -> Option<Self::Item> {
×
164
        return if let Some(ref mut stmt) = self.stmt {
×
165
            match stmt.fetch().unwrap() {
×
166
                None => None,
×
167
                Some(mut cursor) => {
×
168
                    let mut values = vec![];
×
169
                    for i in 1..(self.cols + 1) {
×
170
                        let descriptor = self.schema.get((i - 1) as usize)?;
×
171
                        let value = convert_data(&mut cursor, i as u16, descriptor).unwrap();
×
172
                        values.push(value);
×
173
                    }
×
174

×
175
                    Some(values)
×
176
                }
×
177
            }
×
178
        } else {
×
179
            None
×
180
        };
×
181
    }
×
182
}
×
183

×
184
pub struct Client {
×
185
    conn_string: String,
×
186
}
×
187

×
188
impl Client {
×
189
    pub fn new(config: &SnowflakeConfig) -> Self {
×
190
        let mut conn_hashmap: HashMap<String, String> = HashMap::new();
×
191
        let driver = match &config.driver {
×
192
            None => "Snowflake".to_string(),
×
193
            Some(driver) => driver.to_string(),
×
194
        };
×
195
        conn_hashmap.insert("Driver".to_string(), driver);
×
196
        conn_hashmap.insert("Server".to_string(), config.clone().server);
×
197
        conn_hashmap.insert("Port".to_string(), config.clone().port);
×
198
        conn_hashmap.insert("Uid".to_string(), config.clone().user);
×
199
        conn_hashmap.insert("Pwd".to_string(), config.clone().password);
×
200
        conn_hashmap.insert("Schema".to_string(), config.clone().schema);
×
201
        conn_hashmap.insert("Warehouse".to_string(), config.clone().warehouse);
×
202
        conn_hashmap.insert("Database".to_string(), config.clone().database);
×
203
        conn_hashmap.insert("Role".to_string(), "ACCOUNTADMIN".to_string());
×
204

×
205
        let mut parts = vec![];
×
206
        conn_hashmap.keys().into_iter().for_each(|k| {
×
207
            parts.push(format!("{}={}", k, conn_hashmap.get(k).unwrap()));
×
208
        });
×
209

×
210
        let conn_string = parts.join(";");
×
211

×
212
        debug!("Snowflake conn string: {:?}", conn_string);
×
213

×
214
        Self { conn_string }
×
215
    }
×
216

×
217
    pub fn get_conn_string(&self) -> String {
×
218
        self.conn_string.clone()
×
219
    }
×
220

×
221
    pub fn exec(
×
222
        &self,
×
223
        conn: &Connection<AutocommitOn>,
×
224
        query: String,
×
225
    ) -> Result<Option<bool>, SnowflakeError> {
×
226
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
227

×
228
        let result = stmt
×
229
            .exec_direct(&query)
×
230
            .map_err(|e| QueryError(Box::new(e)))?;
×
231
        match result {
×
232
            Data(_) => Ok(Some(true)),
×
233
            NoData(_) => Ok(None),
×
234
        }
×
235
    }
×
236

×
237
    fn parse_not_exist_error(e: DiagnosticRecord) -> Result<bool, SnowflakeError> {
×
238
        if e.get_native_error() == 2003 {
×
239
            Ok(false)
×
240
        } else {
×
241
            Err(QueryError(Box::new(e)))
×
242
        }
×
243
    }
×
244

×
245
    fn parse_exist(result: ResultSetState<Executed, AutocommitOn>) -> bool {
×
246
        match result {
×
247
            Data(mut x) => x.fetch().unwrap().is_some(),
×
248
            NoData(_) => false,
×
249
        }
×
250
    }
×
251

×
252
    pub fn stream_exist(
×
253
        &self,
×
254
        conn: &Connection<AutocommitOn>,
×
255
        stream_name: &String,
×
256
    ) -> Result<bool, SnowflakeError> {
×
257
        let query = format!("DESCRIBE STREAM {stream_name};");
×
258

×
259
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
260
        stmt.exec_direct(&query)
×
261
            .map_or_else(Self::parse_not_exist_error, |result| {
×
262
                Ok(Self::parse_exist(result))
×
263
            })
×
264
    }
×
265

×
266
    pub fn table_exist(
×
267
        &self,
×
268
        conn: &Connection<AutocommitOn>,
×
269
        table_name: &String,
×
270
    ) -> Result<bool, SnowflakeError> {
×
271
        let query =
×
272
            format!("SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{table_name}';");
×
273

×
274
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
275
        stmt.exec_direct(&query)
×
276
            .map_or_else(Self::parse_not_exist_error, |result| {
×
277
                Ok(Self::parse_exist(result))
×
278
            })
×
279
    }
×
280

×
281
    pub fn drop_stream(
×
282
        &self,
×
283
        conn: &Connection<AutocommitOn>,
×
284
        stream_name: &String,
×
285
    ) -> Result<bool, SnowflakeError> {
×
286
        let query = format!("DROP STREAM IF EXISTS {stream_name}");
×
287

×
288
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
289
        stmt.exec_direct(&query)
×
290
            .map_or_else(Self::parse_not_exist_error, |result| {
×
291
                Ok(Self::parse_exist(result))
×
292
            })
×
293
    }
×
294

×
295
    pub fn fetch<'a, 'b>(
×
296
        &self,
×
297
        conn: &'a Connection<AutocommitOn>,
×
298
        query: String,
×
299
    ) -> Result<Option<(Vec<ColumnDescriptor>, ResultIterator<'a, 'b>)>, ConnectorError> {
×
300
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
301
        // TODO: use stmt.close_cursor to improve efficiency
×
302

×
303
        match stmt
×
304
            .exec_direct(&query)
×
305
            .map_err(|e| QueryError(Box::new(e)))?
×
306
        {
×
307
            Data(stmt) => {
×
308
                let cols = stmt
×
309
                    .num_result_cols()
×
310
                    .map_err(|e| QueryError(Box::new(e)))?;
×
311
                let schema_result: Result<Vec<ColumnDescriptor>, SnowflakeError> = (1..(cols + 1))
×
312
                    .map(|i| {
×
313
                        let value = i.try_into();
×
314
                        match value {
×
315
                            Ok(v) => {
×
316
                                Ok(stmt.describe_col(v).map_err(|e| QueryError(Box::new(e)))?)
×
317
                            }
×
318
                            Err(e) => Err(SnowflakeError::SnowflakeSchemaError(
×
319
                                SchemaConversionError(e),
×
320
                            )),
×
321
                        }
×
322
                    })
×
323
                    .collect();
×
324

×
325
                let schema = schema_result?;
×
326
                Ok(Some((
×
327
                    schema.clone(),
×
328
                    ResultIterator {
×
329
                        cols,
×
330
                        stmt: Some(stmt),
×
331
                        schema,
×
332
                    },
×
333
                )))
×
334
            }
×
335
            NoData(_) => Ok(None),
×
336
        }
×
337
    }
×
338

×
339
    pub fn execute_query(
×
340
        &self,
×
341
        conn: &Connection<AutocommitOn>,
×
342
        query: &str,
×
343
    ) -> Result<(), SnowflakeError> {
×
344
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
345
        stmt.exec_direct(query)
×
346
            .map_err(|e| QueryError(Box::new(e)))?;
×
347
        Ok(())
×
348
    }
×
349

×
350
    pub fn fetch_tables(
×
351
        &self,
×
352
        tables: Option<Vec<TableInfo>>,
×
353
        tables_indexes: HashMap<String, usize>,
×
354
        keys: HashMap<String, Vec<String>>,
×
355
        conn: &Connection<AutocommitOn>,
×
356
    ) -> Result<Vec<SchemaWithChangesType>, SnowflakeError> {
×
357
        let tables_condition = tables.map_or("".to_string(), |tables| {
×
358
            let mut buf = String::new();
×
359
            buf.write_str(" AND TABLE_NAME IN(").unwrap();
×
360
            for (idx, table_info) in tables.iter().enumerate() {
×
361
                if idx > 0 {
×
362
                    buf.write_char(',').unwrap();
×
363
                }
×
364
                buf.write_str(&format!("\'{}\'", table_info.table_name))
×
365
                    .unwrap();
×
366
            }
×
367
            buf.write_char(')').unwrap();
×
368
            buf
×
369
        });
×
370

×
371
        let query = format!(
×
372
            "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, NUMERIC_SCALE
×
373
            FROM INFORMATION_SCHEMA.COLUMNS
×
374
            WHERE TABLE_SCHEMA = 'PUBLIC' {tables_condition}
×
375
            ORDER BY TABLE_NAME, ORDINAL_POSITION"
×
376
        );
×
377

×
378
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
379
        match stmt
×
380
            .exec_direct(&query)
×
381
            .map_err(|e| QueryError(Box::new(e)))?
×
382
        {
×
383
            Data(data) => {
×
384
                let cols = data
×
385
                    .num_result_cols()
×
386
                    .map_err(|e| QueryError(Box::new(e)))?;
×
387

×
388
                let schema_result: Result<Vec<ColumnDescriptor>, SnowflakeError> = (1..(cols + 1))
×
389
                    .map(|i| {
×
390
                        let value = i.try_into();
×
391
                        match value {
×
392
                            Ok(v) => {
×
393
                                Ok(data.describe_col(v).map_err(|e| QueryError(Box::new(e)))?)
×
394
                            }
×
395
                            Err(e) => Err(SnowflakeError::SnowflakeSchemaError(
×
396
                                SchemaConversionError(e),
×
397
                            )),
×
398
                        }
×
399
                    })
×
400
                    .collect();
×
401

×
402
                let schema = schema_result?;
×
403

×
404
                let mut schemas: HashMap<String, Schema> = HashMap::new();
×
405
                let iterator = ResultIterator {
×
406
                    cols,
×
407
                    stmt: Some(data),
×
408
                    schema,
×
409
                };
×
410

×
411
                for row_data in iterator {
×
412
                    let empty = "".to_string();
×
413
                    let table_name = if let Field::String(table_name) = &row_data.get(1).unwrap() {
×
414
                        table_name
×
415
                    } else {
×
416
                        &empty
×
417
                    };
×
418
                    let field_name = if let Field::String(field_name) = &row_data.get(2).unwrap() {
×
419
                        field_name
×
420
                    } else {
×
421
                        &empty
×
422
                    };
×
423
                    let type_name = if let Field::String(type_name) = &row_data.get(3).unwrap() {
×
424
                        type_name
×
425
                    } else {
×
426
                        &empty
×
427
                    };
×
428
                    let nullable = if let Field::Boolean(b) = &row_data.get(4).unwrap() {
×
429
                        b
×
430
                    } else {
×
431
                        &false
×
432
                    };
×
433
                    let scale = if let Field::Int(scale) = &row_data.get(5).unwrap() {
×
434
                        Some(*scale)
×
435
                    } else {
×
436
                        None
×
437
                    };
×
438

×
439
                    let schema_id = *tables_indexes.get(&table_name.clone()).unwrap();
×
440

×
441
                    schemas
×
442
                        .entry(table_name.clone())
×
443
                        .or_insert(Schema {
×
444
                            identifier: Some(SchemaIdentifier {
×
445
                                id: schema_id as u32,
×
446
                                version: 0,
×
447
                            }),
×
448
                            fields: vec![],
×
449
                            primary_index: vec![0],
×
450
                        })
×
451
                        .fields
×
452
                        .push(FieldDefinition {
×
453
                            name: field_name.clone(),
×
454
                            typ: SchemaHelper::map_schema_type(type_name, scale)?,
×
455
                            nullable: *nullable,
×
456
                            source: SourceDefinition::Dynamic,
×
457
                        })
×
458
                }
×
459

×
460
                Ok(schemas
×
461
                    .into_iter()
×
462
                    .map(|(name, mut schema)| {
×
463
                        let mut indexes = vec![];
×
464
                        keys.get(&name).map_or((), |columns| {
×
465
                            schema.fields.iter().enumerate().for_each(|(idx, f)| {
×
466
                                if columns.contains(&f.name) {
×
467
                                    indexes.push(idx);
×
468
                                }
×
469
                            });
×
470
                        });
×
471

×
472
                        schema.primary_index = indexes;
×
473

×
474
                        (name, schema, ReplicationChangesTrackingType::FullChanges)
×
475
                    })
×
476
                    .collect())
×
477
            }
×
478
            NoData(_) => Ok(vec![]),
×
479
        }
×
480
    }
×
481

×
482
    pub fn fetch_keys(
×
483
        &self,
×
484
        conn: &Connection<AutocommitOn>,
×
485
    ) -> Result<HashMap<String, Vec<String>>, SnowflakeError> {
×
486
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
487
        match stmt
×
488
            .exec_direct("SHOW PRIMARY KEYS IN SCHEMA")
×
489
            .map_err(|e| QueryError(Box::new(e)))?
×
490
        {
×
491
            Data(data) => {
×
492
                let cols = data
×
493
                    .num_result_cols()
×
494
                    .map_err(|e| QueryError(Box::new(e)))?;
×
495

×
496
                let schema_result: Result<Vec<ColumnDescriptor>, SnowflakeError> = (1..(cols + 1))
×
497
                    .map(|i| {
×
498
                        let value = i.try_into();
×
499
                        match value {
×
500
                            Ok(v) => {
×
501
                                Ok(data.describe_col(v).map_err(|e| QueryError(Box::new(e)))?)
×
502
                            }
×
503
                            Err(e) => Err(SnowflakeError::SnowflakeSchemaError(
×
504
                                SchemaConversionError(e),
×
505
                            )),
×
506
                        }
×
507
                    })
×
508
                    .collect();
×
509

×
510
                let schema = schema_result?;
×
511

×
512
                let mut keys: HashMap<String, Vec<String>> = HashMap::new();
×
513
                let iterator = ResultIterator {
×
514
                    cols,
×
515
                    stmt: Some(data),
×
516
                    schema,
×
517
                };
×
518

×
519
                for row_data in iterator {
×
520
                    let empty = "".to_string();
×
521
                    let table_name = row_data.get(3).map_or(empty.clone(), |v| match v {
×
522
                        Field::String(v) => v.clone(),
×
523
                        _ => empty.clone(),
×
524
                    });
×
525
                    let column_name = row_data.get(4).map_or(empty.clone(), |v| match v {
×
526
                        Field::String(v) => v.clone(),
×
527
                        _ => empty.clone(),
×
528
                    });
×
529

×
530
                    keys.entry(table_name).or_default().push(column_name);
×
531
                }
×
532

533
                Ok(keys)
×
534
            }
535
            NoData(_) => Ok(HashMap::new()),
×
536
        }
537
    }
×
538
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc