• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4195006582

pending completion
4195006582

Pull #918

github

GitHub
Merge 14ceac9a6 into dc166625f
Pull Request #918: ObjectStore - Reduce code duplication & more clone prevention

177 of 177 new or added lines in 5 files covered. (100.0%)

24368 of 37519 relevant lines covered (64.95%)

43913.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/object_store/schema_mapper.rs
1
use crate::connectors::object_store::helper::{get_details, map_listing_options};
2
use crate::connectors::object_store::schema_helper::map_schema_to_dozer;
3
use crate::connectors::TableInfo;
4
use crate::errors::ObjectStoreObjectError::{ListingPathParsingError, TableDefinitionNotFound};
5
use crate::errors::{ConnectorError, ObjectStoreConnectorError};
6
use crossbeam::channel;
7
use crossbeam::channel::Receiver;
8
use datafusion::arrow::datatypes::SchemaRef;
9
use datafusion::datasource::listing::ListingTableUrl;
10
use datafusion::prelude::SessionContext;
11
use dozer_types::ingestion_types::{LocalStorage, S3Storage, Table};
12
use dozer_types::log::error;
13
use dozer_types::types::ReplicationChangesTrackingType::Nothing;
14
use dozer_types::types::{Schema, SchemaIdentifier, SchemaWithChangesType};
15
use object_store::aws::AmazonS3Builder;
16
use object_store::local::LocalFileSystem;
17
use std::collections::HashMap;
18
use std::sync::Arc;
19
use tokio::runtime::Runtime;
20

21
pub struct SchemaMapper<T: Clone + Send + Sync> {
22
    config: T,
23
}
24

25
fn prepare_tables_for_mapping(
×
26
    tables: Option<Vec<TableInfo>>,
×
27
    tables_map: &HashMap<String, Table>,
×
28
) -> Vec<TableInfo> {
×
29
    tables.unwrap_or_else(|| {
×
30
        tables_map
×
31
            .values()
×
32
            .into_iter()
×
33
            .map(|t| TableInfo {
×
34
                name: t.name.clone(),
×
35
                table_name: t.name.clone(),
×
36
                id: 0,
×
37
                columns: None,
×
38
            })
×
39
            .collect()
×
40
    })
×
41
}
×
42

×
43
impl<T: Clone + Send + Sync> SchemaMapper<T> {
×
44
    pub fn new(config: T) -> SchemaMapper<T> {
×
45
        Self { config }
×
46
    }
×
47

×
48
    fn map_schema(
×
49
        &self,
×
50
        id: u32,
×
51
        rx: Receiver<datafusion::error::Result<SchemaRef>>,
×
52
        table: TableInfo,
×
53
    ) -> Result<Schema, ConnectorError> {
×
54
        let c = rx
×
55
            .recv()
×
56
            .map_err(|e| {
×
57
                error!("{:?}", e);
×
58
                ConnectorError::WrongConnectionConfiguration
×
59
            })?
×
60
            .map_err(|e| {
×
61
                error!("{:?}", e);
×
62
                ConnectorError::WrongConnectionConfiguration
×
63
            })?;
×
64

×
65
        let fields_list = c.fields().iter().filter(|f| match table.columns.as_ref() {
×
66
            Some(columns) if !columns.is_empty() => columns.iter().any(|c| &c.name == f.name()),
×
67
            _ => true,
×
68
        });
×
69

×
70
        let fields = map_schema_to_dozer(fields_list)
×
71
            .map_err(ObjectStoreConnectorError::DataFusionSchemaError)?;
×
72

×
73
        Ok(Schema {
×
74
            identifier: Some(SchemaIdentifier { id, version: 0 }),
×
75
            fields,
×
76
            primary_index: vec![],
×
77
        })
×
78
    }
×
79
}
80

81
pub trait Mapper<T> {
82
    fn get_schema(
83
        &self,
84
        tables: Option<Vec<TableInfo>>,
×
85
    ) -> Result<Vec<SchemaWithChangesType>, ConnectorError>;
×
86
}
×
87

×
88
impl Mapper<S3Storage> for SchemaMapper<S3Storage> {
×
89
    fn get_schema(
×
90
        &self,
×
91
        tables: Option<Vec<TableInfo>>,
×
92
    ) -> Result<Vec<SchemaWithChangesType>, ConnectorError> {
×
93
        let tables_map: HashMap<String, Table> = self
×
94
            .config
×
95
            .tables
×
96
            .clone()
×
97
            .into_iter()
×
98
            .map(|table| (table.name.clone(), table))
×
99
            .collect();
×
100

×
101
        let tables_list = prepare_tables_for_mapping(tables, &tables_map);
×
102

×
103
        let details = get_details(&self.config.details)?;
×
104
        let mut schemas = vec![];
×
105

×
106
        for (id, table) in tables_list.iter().enumerate() {
×
107
            let data_fusion_table = get_table(&tables_map, table)?;
×
108
            let path = format!("s3://{}/{}/", details.bucket_name, data_fusion_table.prefix);
×
109

×
110
            let table_path = get_table_path(path)?;
×
111

×
112
            let listing_options = map_listing_options(data_fusion_table);
×
113

×
114
            let (tx, rx) = channel::bounded(1);
×
115

×
116
            let rt = Runtime::new().map_err(|_| ObjectStoreConnectorError::RuntimeCreationError)?;
×
117

118
            let details = details.clone();
×
119
            let ctx = SessionContext::new();
×
120
            let s3 = AmazonS3Builder::new()
×
121
                .with_bucket_name(details.bucket_name.to_owned())
×
122
                .with_region(details.region.to_owned())
×
123
                .with_access_key_id(details.access_key_id.to_owned())
×
124
                .with_secret_access_key(details.secret_access_key.to_owned())
×
125
                .build()
×
126
                .map_or(Err(ConnectorError::InitializationError), Ok)?;
×
127

×
128
            rt.block_on(async move {
×
129
                ctx.runtime_env()
×
130
                    .register_object_store("s3", &details.bucket_name, Arc::new(s3));
×
131

×
132
                let resolved_schema = listing_options
×
133
                    .infer_schema(&ctx.state(), &table_path)
×
134
                    .await;
×
135

136
                tx.send(resolved_schema)
×
137
                    .map_err(|_| {
×
138
                        ConnectorError::DataFusionConnectorError(
×
139
                            ObjectStoreConnectorError::InternalError,
×
140
                        )
×
141
                    })
×
142
                    .unwrap();
×
143
            });
×
144

×
145
            let schema = self.map_schema(id as u32, rx, table.clone())?;
×
146
            schemas.push((table.table_name.clone(), schema, Nothing))
×
147
        }
×
148

×
149
        Ok(schemas)
×
150
    }
×
151
}
×
152

×
153
impl Mapper<LocalStorage> for SchemaMapper<LocalStorage> {
154
    fn get_schema(
×
155
        &self,
×
156
        tables: Option<Vec<TableInfo>>,
×
157
    ) -> Result<Vec<SchemaWithChangesType>, ConnectorError> {
×
158
        let tables_map: HashMap<String, Table> = self
×
159
            .config
×
160
            .tables
×
161
            .clone()
×
162
            .into_iter()
×
163
            .map(|table| (table.name.clone(), table))
×
164
            .collect();
×
165

×
166
        let tables_list = prepare_tables_for_mapping(tables, &tables_map);
×
167

×
168
        let details = get_details(&self.config.details)?;
×
169

170
        let mut schemas = vec![];
×
171
        for (id, table) in tables_list.iter().enumerate() {
×
172
            let data_fusion_table = get_table(&tables_map, table)?;
×
173
            let path = format!("{}/{}/", details.path.clone(), data_fusion_table.prefix);
×
174

×
175
            let listing_options = map_listing_options(data_fusion_table);
×
176

×
177
            let (tx, rx) = channel::bounded(1);
×
178

×
179
            let rt = Runtime::new().map_err(|_| ObjectStoreConnectorError::RuntimeCreationError)?;
×
180

×
181
            let details = details.clone();
×
182
            let ctx = SessionContext::new();
×
183
            let ls = LocalFileSystem::new_with_prefix(details.path.clone())
×
184
                .map_or(Err(ConnectorError::InitializationError), Ok)?;
×
185
            ctx.runtime_env()
×
186
                .register_object_store("local", &details.path, Arc::new(ls));
×
187

×
188
            let table_path = get_table_path(path)?;
×
189

×
190
            rt.block_on(async move {
×
191
                let resolved_schema = listing_options
×
192
                    .infer_schema(&ctx.state(), &table_path)
×
193
                    .await;
×
194

×
195
                tx.send(resolved_schema)
×
196
                    .map_err(|_| {
×
197
                        ConnectorError::DataFusionConnectorError(
×
198
                            ObjectStoreConnectorError::InternalError,
×
199
                        )
×
200
                    })
×
201
                    .unwrap()
×
202
            });
×
203

×
204
            let schema = self.map_schema(id as u32, rx, table.clone())?;
×
205
            schemas.push((table.table_name.clone(), schema, Nothing))
×
206
        }
×
207

×
208
        Ok(schemas)
×
209
    }
×
210
}
×
211

×
212
fn get_table<'a>(
×
213
    tables_map: &'a HashMap<String, Table>,
×
214
    table: &TableInfo,
×
215
) -> Result<&'a Table, ObjectStoreConnectorError> {
×
216
    tables_map.get(&table.table_name).ok_or(
×
217
        ObjectStoreConnectorError::DataFusionStorageObjectError(TableDefinitionNotFound),
×
218
    )
×
219
}
×
220

×
221
fn get_table_path(path: String) -> Result<ListingTableUrl, ObjectStoreConnectorError> {
×
222
    ListingTableUrl::parse(path).map_err(|_| {
×
223
        ObjectStoreConnectorError::DataFusionStorageObjectError(ListingPathParsingError)
×
224
    })
×
225
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc