• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4195006582

pending completion
4195006582

Pull #918

github

GitHub
Merge 14ceac9a6 into dc166625f
Pull Request #918: ObjectStore - Reduce code duplication & more clone prevention

177 of 177 new or added lines in 5 files covered. (100.0%)

24368 of 37519 relevant lines covered (64.95%)

43913.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/object_store/connector.rs
1
use dozer_types::ingestion_types::{LocalStorage, S3Storage};
2
use std::collections::HashMap;
3
use std::sync::Arc;
4

5
use crate::connectors::object_store::schema_mapper::{Mapper, SchemaMapper};
6
use crate::connectors::object_store::table_reader::{Reader, TableReader};
7
use crate::connectors::TableInfo;
8
use crate::errors::ConnectorError;
9
use crate::{connectors::Connector, errors, ingestion::Ingestor};
10
use dozer_types::parking_lot::RwLock;
11

12
pub struct ObjectStoreConnector<T: Clone> {
13
    pub id: u64,
14
    config: T,
15
    ingestor: Option<Arc<RwLock<Ingestor>>>,
16
    tables: Option<Vec<TableInfo>>,
17
}
18

19
impl<T: Clone> ObjectStoreConnector<T> {
20
    pub fn new(id: u64, config: T) -> Self {
×
21
        Self {
×
22
            id,
×
23
            config,
×
24
            ingestor: None,
×
25
            tables: None,
×
26
        }
×
27
    }
×
28
}
29

30
impl Connector for ObjectStoreConnector<S3Storage> {
31
    fn validate(&self, _tables: Option<Vec<TableInfo>>) -> Result<(), ConnectorError> {
×
32
        Ok(())
×
33
    }
×
34

35
    fn validate_schemas(
×
36
        &self,
×
37
        _tables: &[TableInfo],
×
38
    ) -> Result<crate::connectors::ValidationResults, errors::ConnectorError> {
×
39
        Ok(HashMap::new())
×
40
    }
×
41

42
    fn get_schemas(
×
43
        &self,
×
44
        table_names: Option<Vec<TableInfo>>,
×
45
    ) -> Result<Vec<dozer_types::types::SchemaWithChangesType>, ConnectorError> {
×
46
        let mapper = SchemaMapper::new(self.config.clone());
×
47
        mapper.get_schema(table_names)
×
48
    }
×
49

×
50
    fn initialize(
×
51
        &mut self,
×
52
        ingestor: Arc<RwLock<Ingestor>>,
×
53
        tables: Option<Vec<TableInfo>>,
×
54
    ) -> Result<(), ConnectorError> {
×
55
        self.ingestor = Some(ingestor);
×
56
        self.tables = tables;
×
57
        Ok(())
×
58
    }
×
59

×
60
    fn start(&self, _from_seq: Option<(u64, u64)>) -> Result<(), ConnectorError> {
×
61
        let tables = match &self.tables {
×
62
            Some(tables) if !tables.is_empty() => tables,
×
63
            _ => return Ok(()),
×
64
        };
×
65

×
66
        let ingestor = self
×
67
            .ingestor
×
68
            .as_ref()
×
69
            .ok_or(ConnectorError::InitializationError)?;
×
70

71
        TableReader::new(self.config.clone()).read_tables(tables, ingestor)
×
72
    }
×
73

×
74
    fn get_tables(&self, _tables: Option<&[TableInfo]>) -> Result<Vec<TableInfo>, ConnectorError> {
×
75
        todo!()
×
76
    }
×
77
}
×
78

×
79
impl Connector for ObjectStoreConnector<LocalStorage> {
80
    fn validate(&self, _tables: Option<Vec<TableInfo>>) -> Result<(), ConnectorError> {
×
81
        Ok(())
×
82
    }
×
83

84
    fn validate_schemas(
×
85
        &self,
×
86
        _tables: &[crate::connectors::TableInfo],
×
87
    ) -> Result<crate::connectors::ValidationResults, errors::ConnectorError> {
×
88
        Ok(HashMap::new())
×
89
    }
×
90

×
91
    fn get_schemas(
×
92
        &self,
×
93
        table_names: Option<Vec<TableInfo>>,
×
94
    ) -> Result<Vec<dozer_types::types::SchemaWithChangesType>, ConnectorError> {
×
95
        let mapper = SchemaMapper::new(self.config.clone());
×
96
        mapper.get_schema(table_names)
×
97
    }
×
98

×
99
    fn initialize(
×
100
        &mut self,
×
101
        ingestor: Arc<RwLock<Ingestor>>,
×
102
        tables: Option<Vec<TableInfo>>,
×
103
    ) -> Result<(), ConnectorError> {
×
104
        self.ingestor = Some(ingestor);
×
105
        self.tables = tables;
×
106
        Ok(())
×
107
    }
×
108

×
109
    fn start(&self, _from_seq: Option<(u64, u64)>) -> Result<(), ConnectorError> {
×
110
        let tables = match &self.tables {
×
111
            Some(tables) if !tables.is_empty() => tables,
×
112
            _ => return Ok(()),
×
113
        };
×
114

×
115
        let ingestor = self
×
116
            .ingestor
×
117
            .as_ref()
×
118
            .ok_or(ConnectorError::InitializationError)?;
×
119

×
120
        TableReader::new(self.config.clone()).read_tables(tables, ingestor)
×
121
    }
×
122

×
123
    fn get_tables(&self, _tables: Option<&[TableInfo]>) -> Result<Vec<TableInfo>, ConnectorError> {
×
124
        todo!()
×
125
    }
×
126
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc