• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4075835066

pending completion
4075835066

Pull #790

github

GitHub
Merge 39f3c7143 into 3223082a5
Pull Request #790: refactor: Use `daggy` for the underlying data structure of `Dag`

396 of 396 new or added lines in 11 files covered. (100.0%)

24551 of 36528 relevant lines covered (67.21%)

54898.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/snowflake/connection/client.rs
1
use dozer_types::ingestion_types::SnowflakeConfig;
2
use dozer_types::log::debug;
3

4
use crate::errors::{ConnectorError, SnowflakeError, SnowflakeSchemaError};
5

6
use crate::connectors::snowflake::schema_helper::SchemaHelper;
7
use crate::connectors::TableInfo;
8
use crate::errors::SnowflakeError::{QueryError, SnowflakeStreamError};
9
use crate::errors::SnowflakeSchemaError::DecimalConvertError;
10
use crate::errors::SnowflakeSchemaError::SchemaConversionError;
11
use crate::errors::SnowflakeStreamError::TimeTravelNotAvailableError;
12
use dozer_types::chrono::{NaiveDate, NaiveDateTime, NaiveTime};
13
use dozer_types::rust_decimal::Decimal;
14
use dozer_types::types::*;
15
use odbc::ffi::{SqlDataType, SQL_DATE_STRUCT, SQL_TIMESTAMP_STRUCT};
16
use odbc::odbc_safe::AutocommitOn;
17
use odbc::{
18
    ColumnDescriptor, Connection, Cursor, Data, DiagnosticRecord, Executed, HasResult, NoData,
19
    ResultSetState, Statement,
20
};
21
use std::collections::HashMap;
22
use std::fmt::Write;
23

24
fn convert_decimal(bytes: &[u8], scale: u16) -> Result<Field, SnowflakeSchemaError> {
×
25
    let is_negative = bytes[bytes.len() - 4] == 255;
×
26
    let mut multiplier: i64 = 1;
×
27
    let mut result: i64 = 0;
×
28
    let bytes: &[u8] = &bytes[4..11];
×
29
    bytes.iter().for_each(|w| {
×
30
        let number = *w as i64;
×
31
        result += number * multiplier;
×
32
        multiplier *= 256;
×
33
    });
×
34

×
35
    if is_negative {
×
36
        result = -result;
×
37
    }
×
38

39
    Ok(Field::from(
40
        Decimal::try_new(result, scale as u32).map_err(DecimalConvertError)?,
×
41
    ))
42
}
×
43

44
pub fn convert_data(
×
45
    cursor: &mut Cursor<Executed, AutocommitOn>,
×
46
    i: u16,
×
47
    column_descriptor: &ColumnDescriptor,
×
48
) -> Result<Field, SnowflakeSchemaError> {
×
49
    match column_descriptor.data_type {
×
50
        SqlDataType::SQL_CHAR | SqlDataType::SQL_VARCHAR => {
51
            match cursor
×
52
                .get_data::<String>(i)
×
53
                .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
54
            {
55
                None => Ok(Field::Null),
×
56
                Some(value) => Ok(Field::from(value)),
×
57
            }
58
        }
59
        SqlDataType::SQL_DECIMAL
60
        | SqlDataType::SQL_NUMERIC
61
        | SqlDataType::SQL_INTEGER
62
        | SqlDataType::SQL_SMALLINT => match column_descriptor.decimal_digits {
×
63
            None => {
64
                match cursor
×
65
                    .get_data::<i64>(i)
×
66
                    .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
67
                {
68
                    None => Ok(Field::Null),
×
69
                    Some(value) => Ok(Field::from(value)),
×
70
                }
71
            }
72
            Some(digits) => {
×
73
                match cursor
×
74
                    .get_data::<&[u8]>(i)
×
75
                    .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
76
                {
77
                    None => Ok(Field::Null),
×
78
                    Some(value) => convert_decimal(value, digits),
×
79
                }
80
            }
81
        },
82
        SqlDataType::SQL_FLOAT | SqlDataType::SQL_REAL | SqlDataType::SQL_DOUBLE => {
83
            match cursor
×
84
                .get_data::<f64>(i)
×
85
                .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
86
            {
87
                None => Ok(Field::Null),
×
88
                Some(value) => Ok(Field::from(value)),
×
89
            }
90
        }
91
        SqlDataType::SQL_TIMESTAMP => {
92
            match cursor
×
93
                .get_data::<SQL_TIMESTAMP_STRUCT>(i)
×
94
                .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
95
            {
96
                None => Ok(Field::Null),
×
97
                Some(value) => {
×
98
                    let date = NaiveDate::from_ymd(
×
99
                        value.year as i32,
×
100
                        value.month as u32,
×
101
                        value.day as u32,
×
102
                    );
×
103
                    let time = NaiveTime::from_hms_nano(
×
104
                        value.hour as u32,
×
105
                        value.minute as u32,
×
106
                        value.second as u32,
×
107
                        value.fraction,
×
108
                    );
×
109
                    Ok(Field::from(NaiveDateTime::new(date, time)))
×
110
                }
111
            }
112
        }
113
        SqlDataType::SQL_DATE => {
114
            match cursor
×
115
                .get_data::<SQL_DATE_STRUCT>(i)
×
116
                .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
117
            {
118
                None => Ok(Field::Null),
×
119
                Some(value) => {
×
120
                    let date = NaiveDate::from_ymd(
×
121
                        value.year as i32,
×
122
                        value.month as u32,
×
123
                        value.day as u32,
×
124
                    );
×
125
                    Ok(Field::from(date))
×
126
                }
127
            }
128
        }
129
        SqlDataType::SQL_EXT_BIT => {
130
            match cursor
×
131
                .get_data::<bool>(i)
×
132
                .map_err(|e| SnowflakeSchemaError::ValueConversionError(Box::new(e)))?
×
133
            {
134
                None => Ok(Field::Null),
×
135
                Some(v) => Ok(Field::from(v)),
×
136
            }
137
        }
138
        _ => Err(SnowflakeSchemaError::ColumnTypeNotSupported(format!(
×
139
            "{:?}",
×
140
            &column_descriptor.data_type
×
141
        ))),
×
142
    }
143
}
×
144

145
pub struct ResultIterator<'a, 'b> {
146
    stmt: Option<Statement<'a, 'b, Executed, HasResult, AutocommitOn>>,
147
    cols: i16,
148
    schema: Vec<ColumnDescriptor>,
149
}
150

151
impl<'a, 'b> ResultIterator<'a, 'b> {
152
    pub fn close_cursor(&mut self) -> Result<(), SnowflakeError> {
×
153
        self.stmt
×
154
            .take()
×
155
            .unwrap()
×
156
            .close_cursor()
×
157
            .map_or_else(|e| Err(QueryError(Box::new(e))), |_| Ok(()))
×
158
    }
×
159
}
160

161
impl Iterator for ResultIterator<'_, '_> {
162
    type Item = Vec<Field>;
163

164
    fn next(&mut self) -> Option<Self::Item> {
165
        return if let Some(ref mut stmt) = self.stmt {
×
166
            match stmt.fetch().unwrap() {
×
167
                None => None,
×
168
                Some(mut cursor) => {
×
169
                    let mut values = vec![];
×
170
                    for i in 1..(self.cols + 1) {
×
171
                        let descriptor = self.schema.get((i - 1) as usize)?;
×
172
                        let value = convert_data(&mut cursor, i as u16, descriptor).unwrap();
×
173
                        values.push(value);
×
174
                    }
175

176
                    Some(values)
×
177
                }
178
            }
179
        } else {
180
            None
×
181
        };
182
    }
×
183
}
184

185
pub struct Client {
186
    conn_string: String,
187
}
188

189
impl Client {
190
    pub fn new(config: &SnowflakeConfig) -> Self {
×
191
        let mut conn_hashmap: HashMap<String, String> = HashMap::new();
×
192
        let driver = match &config.driver {
×
193
            None => "Snowflake".to_string(),
×
194
            Some(driver) => driver.to_string(),
×
195
        };
196
        conn_hashmap.insert("Driver".to_string(), driver);
×
197
        conn_hashmap.insert("Server".to_string(), config.clone().server);
×
198
        conn_hashmap.insert("Port".to_string(), config.clone().port);
×
199
        conn_hashmap.insert("Uid".to_string(), config.clone().user);
×
200
        conn_hashmap.insert("Pwd".to_string(), config.clone().password);
×
201
        conn_hashmap.insert("Schema".to_string(), config.clone().schema);
×
202
        conn_hashmap.insert("Warehouse".to_string(), config.clone().warehouse);
×
203
        conn_hashmap.insert("Database".to_string(), config.clone().database);
×
204
        conn_hashmap.insert("Role".to_string(), "ACCOUNTADMIN".to_string());
×
205

×
206
        let mut parts = vec![];
×
207
        conn_hashmap.keys().into_iter().for_each(|k| {
×
208
            parts.push(format!("{}={}", k, conn_hashmap.get(k).unwrap()));
×
209
        });
×
210

×
211
        let conn_string = parts.join(";");
×
212

×
213
        debug!("Snowflake conn string: {:?}", conn_string);
×
214

215
        Self { conn_string }
×
216
    }
×
217

218
    pub fn get_conn_string(&self) -> String {
×
219
        self.conn_string.clone()
×
220
    }
×
221

222
    pub fn exec(
×
223
        &self,
×
224
        conn: &Connection<AutocommitOn>,
×
225
        query: String,
×
226
    ) -> Result<Option<bool>, SnowflakeError> {
×
227
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
228

229
        let result = stmt
×
230
            .exec_direct(&query)
×
231
            .map_err(|e| QueryError(Box::new(e)))?;
×
232
        match result {
×
233
            Data(_) => Ok(Some(true)),
×
234
            NoData(_) => Ok(None),
×
235
        }
236
    }
×
237

238
    pub fn exec_stream_creation(
×
239
        &self,
×
240
        conn: &Connection<AutocommitOn>,
×
241
        query: String,
×
242
    ) -> Result<bool, SnowflakeError> {
×
243
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
244

245
        let result = stmt.exec_direct(&query);
×
246

×
247
        result.map_or_else(
×
248
            |e| {
×
249
                if e.get_native_error() == 2203 {
×
250
                    Ok(false)
×
251
                } else if e.get_native_error() == 707 {
×
252
                    Err(SnowflakeStreamError(TimeTravelNotAvailableError))
×
253
                } else {
254
                    Err(QueryError(Box::new(e)))
×
255
                }
256
            },
×
257
            |_| Ok(true),
×
258
        )
×
259
    }
×
260

261
    pub fn parse_stream_creation_error(e: DiagnosticRecord) -> Result<bool, SnowflakeError> {
×
262
        if e.get_native_error() == 2203 {
×
263
            Ok(false)
×
264
        } else {
265
            Err(QueryError(Box::new(e)))
×
266
        }
267
    }
×
268

269
    fn parse_not_exist_error(e: DiagnosticRecord) -> Result<bool, SnowflakeError> {
×
270
        if e.get_native_error() == 2003 {
×
271
            Ok(false)
×
272
        } else {
273
            Err(QueryError(Box::new(e)))
×
274
        }
275
    }
×
276

277
    fn parse_exist(result: ResultSetState<Executed, AutocommitOn>) -> bool {
×
278
        match result {
×
279
            Data(mut x) => x.fetch().unwrap().is_some(),
×
280
            NoData(_) => false,
×
281
        }
282
    }
×
283

284
    pub fn stream_exist(
×
285
        &self,
×
286
        conn: &Connection<AutocommitOn>,
×
287
        stream_name: &String,
×
288
    ) -> Result<bool, SnowflakeError> {
×
289
        let query = format!("SHOW STREAMS LIKE '{stream_name}';");
×
290

291
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
292
        stmt.exec_direct(&query)
×
293
            .map_or_else(Self::parse_not_exist_error, |result| {
×
294
                Ok(Self::parse_exist(result))
×
295
            })
×
296
    }
×
297

298
    pub fn table_exist(
×
299
        &self,
×
300
        conn: &Connection<AutocommitOn>,
×
301
        table_name: &String,
×
302
    ) -> Result<bool, SnowflakeError> {
×
303
        let query =
×
304
            format!("SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{table_name}';");
×
305

306
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
307
        stmt.exec_direct(&query)
×
308
            .map_or_else(Self::parse_not_exist_error, |result| {
×
309
                Ok(Self::parse_exist(result))
×
310
            })
×
311
    }
×
312

313
    pub fn drop_stream(
×
314
        &self,
×
315
        conn: &Connection<AutocommitOn>,
×
316
        stream_name: &String,
×
317
    ) -> Result<bool, SnowflakeError> {
×
318
        let query = format!("DROP STREAM IF EXISTS {stream_name}");
×
319

320
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
321
        stmt.exec_direct(&query)
×
322
            .map_or_else(Self::parse_not_exist_error, |result| {
×
323
                Ok(Self::parse_exist(result))
×
324
            })
×
325
    }
×
326

327
    pub fn fetch<'a, 'b>(
×
328
        &self,
×
329
        conn: &'a Connection<AutocommitOn>,
×
330
        query: String,
×
331
    ) -> Result<Option<(Vec<ColumnDescriptor>, ResultIterator<'a, 'b>)>, ConnectorError> {
×
332
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
333
        // TODO: use stmt.close_cursor to improve efficiency
334

335
        match stmt
×
336
            .exec_direct(&query)
×
337
            .map_err(|e| QueryError(Box::new(e)))?
×
338
        {
339
            Data(stmt) => {
×
340
                let cols = stmt
×
341
                    .num_result_cols()
×
342
                    .map_err(|e| QueryError(Box::new(e)))?;
×
343
                let schema_result: Result<Vec<ColumnDescriptor>, SnowflakeError> = (1..(cols + 1))
×
344
                    .map(|i| {
×
345
                        let value = i.try_into();
×
346
                        match value {
×
347
                            Ok(v) => {
×
348
                                Ok(stmt.describe_col(v).map_err(|e| QueryError(Box::new(e)))?)
×
349
                            }
350
                            Err(e) => Err(SnowflakeError::SnowflakeSchemaError(
×
351
                                SchemaConversionError(e),
×
352
                            )),
×
353
                        }
354
                    })
×
355
                    .collect();
×
356

357
                let schema = schema_result?;
×
358
                Ok(Some((
×
359
                    schema.clone(),
×
360
                    ResultIterator {
×
361
                        cols,
×
362
                        stmt: Some(stmt),
×
363
                        schema,
×
364
                    },
×
365
                )))
×
366
            }
367
            NoData(_) => Ok(None),
×
368
        }
369
    }
×
370

371
    pub fn execute_query(
×
372
        &self,
×
373
        conn: &Connection<AutocommitOn>,
×
374
        query: &str,
×
375
    ) -> Result<(), SnowflakeError> {
×
376
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
377
        stmt.exec_direct(query)
×
378
            .map_err(|e| QueryError(Box::new(e)))?;
×
379
        Ok(())
×
380
    }
×
381

382
    pub fn fetch_tables(
×
383
        &self,
×
384
        tables: Option<Vec<TableInfo>>,
×
385
        tables_indexes: HashMap<String, usize>,
×
386
        keys: HashMap<String, Vec<String>>,
×
387
        conn: &Connection<AutocommitOn>,
×
388
    ) -> Result<Vec<SchemaWithChangesType>, SnowflakeError> {
×
389
        let tables_condition = tables.map_or("".to_string(), |tables| {
×
390
            let mut buf = String::new();
×
391
            buf.write_str(" AND TABLE_NAME IN(").unwrap();
×
392
            for (idx, table_info) in tables.iter().enumerate() {
×
393
                if idx > 0 {
×
394
                    buf.write_char(',').unwrap();
×
395
                }
×
396
                buf.write_str(&format!("\'{}\'", table_info.table_name))
×
397
                    .unwrap();
×
398
            }
399
            buf.write_char(')').unwrap();
×
400
            buf
×
401
        });
×
402

×
403
        let query = format!(
×
404
            "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, NUMERIC_SCALE
×
405
            FROM INFORMATION_SCHEMA.COLUMNS
×
406
            WHERE TABLE_SCHEMA = 'PUBLIC' {tables_condition}
×
407
            ORDER BY TABLE_NAME, ORDINAL_POSITION"
×
408
        );
×
409

410
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
411
        match stmt
×
412
            .exec_direct(&query)
×
413
            .map_err(|e| QueryError(Box::new(e)))?
×
414
        {
415
            Data(data) => {
×
416
                let cols = data
×
417
                    .num_result_cols()
×
418
                    .map_err(|e| QueryError(Box::new(e)))?;
×
419

420
                let schema_result: Result<Vec<ColumnDescriptor>, SnowflakeError> = (1..(cols + 1))
×
421
                    .map(|i| {
×
422
                        let value = i.try_into();
×
423
                        match value {
×
424
                            Ok(v) => {
×
425
                                Ok(data.describe_col(v).map_err(|e| QueryError(Box::new(e)))?)
×
426
                            }
427
                            Err(e) => Err(SnowflakeError::SnowflakeSchemaError(
×
428
                                SchemaConversionError(e),
×
429
                            )),
×
430
                        }
431
                    })
×
432
                    .collect();
×
433

434
                let schema = schema_result?;
×
435

436
                let mut schemas: HashMap<String, Schema> = HashMap::new();
×
437
                let iterator = ResultIterator {
×
438
                    cols,
×
439
                    stmt: Some(data),
×
440
                    schema,
×
441
                };
×
442

443
                for row_data in iterator {
×
444
                    let empty = "".to_string();
×
445
                    let table_name = if let Field::String(table_name) = &row_data.get(1).unwrap() {
×
446
                        table_name
×
447
                    } else {
448
                        &empty
×
449
                    };
450
                    let field_name = if let Field::String(field_name) = &row_data.get(2).unwrap() {
×
451
                        field_name
×
452
                    } else {
453
                        &empty
×
454
                    };
455
                    let type_name = if let Field::String(type_name) = &row_data.get(3).unwrap() {
×
456
                        type_name
×
457
                    } else {
458
                        &empty
×
459
                    };
460
                    let nullable = if let Field::Boolean(b) = &row_data.get(4).unwrap() {
×
461
                        b
×
462
                    } else {
463
                        &false
×
464
                    };
465
                    let scale = if let Field::Int(scale) = &row_data.get(5).unwrap() {
×
466
                        Some(*scale)
×
467
                    } else {
468
                        None
×
469
                    };
470

471
                    let schema_id = *tables_indexes.get(&table_name.clone()).unwrap();
×
472

×
473
                    schemas
×
474
                        .entry(table_name.clone())
×
475
                        .or_insert(Schema {
×
476
                            identifier: Some(SchemaIdentifier {
×
477
                                id: schema_id as u32,
×
478
                                version: 0,
×
479
                            }),
×
480
                            fields: vec![],
×
481
                            primary_index: vec![],
×
482
                        })
×
483
                        .fields
×
484
                        .push(FieldDefinition {
×
485
                            name: field_name.clone(),
×
486
                            typ: SchemaHelper::map_schema_type(type_name, scale)?,
×
487
                            nullable: *nullable,
×
488
                            source: SourceDefinition::Dynamic,
×
489
                        })
490
                }
491

492
                Ok(schemas
×
493
                    .into_iter()
×
494
                    .map(|(name, mut schema)| {
×
495
                        let mut indexes = vec![];
×
496
                        keys.get(&name).map_or((), |columns| {
×
497
                            schema.fields.iter().enumerate().for_each(|(idx, f)| {
×
498
                                if columns.contains(&f.name) {
×
499
                                    indexes.push(idx);
×
500
                                }
×
501
                            });
×
502
                        });
×
503

×
504
                        let replication_type = if indexes.is_empty() {
×
505
                            ReplicationChangesTrackingType::Nothing
×
506
                        } else {
×
507
                            ReplicationChangesTrackingType::FullChanges
×
508
                        };
×
509

510
                        schema.primary_index = indexes;
×
511

×
512
                        (name, schema, replication_type)
×
513
                    })
×
514
                    .collect())
×
515
            }
×
516
            NoData(_) => Ok(vec![]),
×
517
        }
×
518
    }
×
519

×
520
    pub fn fetch_keys(
×
521
        &self,
×
522
        conn: &Connection<AutocommitOn>,
×
523
    ) -> Result<HashMap<String, Vec<String>>, SnowflakeError> {
×
524
        let stmt = Statement::with_parent(conn).map_err(|e| QueryError(Box::new(e)))?;
×
525
        match stmt
×
526
            .exec_direct("SHOW PRIMARY KEYS IN SCHEMA")
×
527
            .map_err(|e| QueryError(Box::new(e)))?
×
528
        {
×
529
            Data(data) => {
×
530
                let cols = data
×
531
                    .num_result_cols()
×
532
                    .map_err(|e| QueryError(Box::new(e)))?;
×
533

×
534
                let schema_result: Result<Vec<ColumnDescriptor>, SnowflakeError> = (1..(cols + 1))
×
535
                    .map(|i| {
×
536
                        let value = i.try_into();
×
537
                        match value {
×
538
                            Ok(v) => {
×
539
                                Ok(data.describe_col(v).map_err(|e| QueryError(Box::new(e)))?)
×
540
                            }
×
541
                            Err(e) => Err(SnowflakeError::SnowflakeSchemaError(
×
542
                                SchemaConversionError(e),
×
543
                            )),
×
544
                        }
×
545
                    })
×
546
                    .collect();
×
547

×
548
                let schema = schema_result?;
×
549

×
550
                let mut keys: HashMap<String, Vec<String>> = HashMap::new();
×
551
                let iterator = ResultIterator {
×
552
                    cols,
×
553
                    stmt: Some(data),
×
554
                    schema,
×
555
                };
×
556

×
557
                for row_data in iterator {
×
558
                    let empty = "".to_string();
×
559
                    let table_name = row_data.get(3).map_or(empty.clone(), |v| match v {
×
560
                        Field::String(v) => v.clone(),
×
561
                        _ => empty.clone(),
×
562
                    });
×
563
                    let column_name = row_data.get(4).map_or(empty.clone(), |v| match v {
×
564
                        Field::String(v) => v.clone(),
×
565
                        _ => empty.clone(),
×
566
                    });
×
567

×
568
                    keys.entry(table_name).or_default().push(column_name);
×
569
                }
×
570

571
                Ok(keys)
×
572
            }
573
            NoData(_) => Ok(HashMap::new()),
×
574
        }
575
    }
×
576
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc