• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4195006582

pending completion
4195006582

Pull #918

github

GitHub
Merge 14ceac9a6 into dc166625f
Pull Request #918: ObjectStore - Reduce code duplication & more clone prevention

177 of 177 new or added lines in 5 files covered. (100.0%)

24368 of 37519 relevant lines covered (64.95%)

43913.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/object_store/table_reader.rs
1
use crate::connectors::object_store::helper::{get_details, get_table, map_listing_options};
2
use crate::connectors::object_store::schema_helper::map_value_to_dozer_field;
3
use crate::connectors::{ColumnInfo, TableInfo};
4
use crate::errors::ObjectStoreConnectorError::TableReaderError;
5
use crate::errors::ObjectStoreObjectError::ListingPathParsingError;
6
use crate::errors::ObjectStoreTableReaderError::{
7
    ColumnsSelectFailed, StreamExecutionError, TableReadFailed,
8
};
9
use crate::errors::{ConnectorError, ObjectStoreConnectorError};
10
use crate::ingestion::Ingestor;
11
use datafusion::datasource::listing::{
12
    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
13
};
14
use datafusion::prelude::SessionContext;
15
use dozer_types::ingestion_types::{IngestionMessage, LocalStorage, S3Storage, Table};
16
use dozer_types::parking_lot::RwLock;
17
use dozer_types::types::{Operation, Record, SchemaIdentifier};
18
use futures::StreamExt;
19
use object_store::aws::{AmazonS3, AmazonS3Builder};
20
use object_store::local::LocalFileSystem;
21
use object_store::ObjectStore;
22
use std::sync::Arc;
23
use tokio::runtime::Runtime;
24

25
pub struct TableReader<T: Clone + Send + Sync> {
26
    config: T,
27
}
28

29
impl<T: Clone + Send + Sync> TableReader<T> {
30
    pub fn new(config: T) -> TableReader<T> {
×
31
        Self { config }
×
32
    }
×
33

×
34
    pub async fn read(
×
35
        id: u32,
×
36
        ctx: SessionContext,
×
37
        table_path: ListingTableUrl,
×
38
        listing_options: ListingOptions,
×
39
        ingestor: Arc<RwLock<Ingestor>>,
×
40
        table: &TableInfo,
×
41
    ) -> Result<(), ObjectStoreConnectorError> {
×
42
        let resolved_schema = listing_options
×
43
            .infer_schema(&ctx.state(), &table_path)
×
44
            .await
×
45
            .map_err(|_| ObjectStoreConnectorError::InternalError)?;
×
46

×
47
        let mut idx = 0;
×
48
        let fields = resolved_schema.all_fields();
×
49

×
50
        let config = ListingTableConfig::new(table_path)
×
51
            .with_listing_options(listing_options)
×
52
            .with_schema(resolved_schema.clone());
×
53

×
54
        let provider = Arc::new(
×
55
            ListingTable::try_new(config)
×
56
                .map_err(ObjectStoreConnectorError::InternalDataFusionError)?,
×
57
        );
58

×
59
        let columns: Vec<ColumnInfo> = match &table.columns {
×
60
            Some(columns_list) if !columns_list.is_empty() => columns_list.clone(),
×
61
            _ => fields
×
62
                .iter()
×
63
                .map(|f| ColumnInfo {
×
64
                    name: f.name().to_string(),
×
65
                    data_type: Some(f.data_type().to_string()),
×
66
                })
×
67
                .collect(),
×
68
        };
69

×
70
        let cols: Vec<&str> = columns.iter().map(|c| c.name.as_str()).collect();
×
71
        let data = ctx
×
72
            .read_table(provider.clone())
×
73
            .map_err(|e| TableReaderError(TableReadFailed(e)))?
×
74
            .select_columns(&cols)
×
75
            .map_err(|e| TableReaderError(ColumnsSelectFailed(e)))?
×
76
            .execute_stream()
×
77
            .await
×
78
            .map_err(|e| TableReaderError(StreamExecutionError(e)))?;
×
79

×
80
        tokio::pin!(data);
×
81

×
82
        while let Some(Ok(batch)) = data.next().await {
×
83
            for row in 0..batch.num_rows() {
×
84
                let fields = batch
×
85
                    .columns()
×
86
                    .iter()
×
87
                    .enumerate()
×
88
                    .map(|(col, column)| {
×
89
                        map_value_to_dozer_field(column, &row, resolved_schema.field(col).name())
×
90
                    })
×
91
                    .collect::<Result<Vec<_>, _>>()?;
×
92

93
                ingestor
×
94
                    .write()
×
95
                    .handle_message((
×
96
                        (0_u64, idx),
×
97
                        IngestionMessage::OperationEvent(Operation::Insert {
×
98
                            new: Record {
×
99
                                schema_id: Some(SchemaIdentifier { id, version: 0 }),
×
100
                                values: fields,
×
101
                                version: None,
×
102
                            },
×
103
                        }),
×
104
                    ))
×
105
                    .unwrap();
×
106

×
107
                idx += 1;
×
108
            }
×
109
        }
110

111
        Ok(())
×
112
    }
×
113
}
×
114

×
115
pub trait DozerObjectStore: Clone + Send + Sync {
×
116
    type ObjectStore: ObjectStore;
×
117

118
    fn table_params<'a>(
119
        &'a self,
120
        table_name: &str,
121
    ) -> Result<DozerObjectStoreParams<'a, Self::ObjectStore>, ConnectorError>;
122
}
123

124
pub struct DozerObjectStoreParams<'a, T: ObjectStore> {
125
    pub scheme: &'static str,
126
    pub host: &'a str,
127
    pub object_store: T,
128
    pub table_path: String,
×
129
    pub data_fusion_table: &'a Table,
×
130
}
×
131

×
132
pub trait Reader<T> {
×
133
    fn read_tables(
×
134
        &self,
×
135
        tables: &[TableInfo],
×
136
        ingestor: &Arc<RwLock<Ingestor>>,
×
137
    ) -> Result<(), ConnectorError>;
×
138
}
×
139

×
140
impl<T: DozerObjectStore> Reader<T> for TableReader<T> {
×
141
    fn read_tables(
×
142
        &self,
×
143
        tables: &[TableInfo],
×
144
        ingestor: &Arc<RwLock<Ingestor>>,
×
145
    ) -> Result<(), ConnectorError> {
×
146
        for (id, table) in tables.iter().enumerate() {
×
147
            let params = self.config.table_params(&table.name)?;
×
148

149
            let table_path = ListingTableUrl::parse(params.table_path).map_err(|_| {
×
150
                ObjectStoreConnectorError::DataFusionStorageObjectError(ListingPathParsingError)
×
151
            })?;
×
152

×
153
            let listing_options = map_listing_options(params.data_fusion_table);
×
154

×
155
            let rt = Runtime::new().map_err(|_| ObjectStoreConnectorError::RuntimeCreationError)?;
×
156

×
157
            let ctx = SessionContext::new();
×
158

×
159
            ctx.runtime_env().register_object_store(
×
160
                params.scheme,
×
161
                params.host,
×
162
                Arc::new(params.object_store),
×
163
            );
×
164

×
165
            rt.block_on(Self::read(
×
166
                id as u32,
×
167
                ctx,
×
168
                table_path,
×
169
                listing_options,
×
170
                ingestor.clone(),
×
171
                table,
×
172
            ))?;
×
173
        }
×
174

×
175
        Ok(())
×
176
    }
×
177
}
178

×
179
impl DozerObjectStore for S3Storage {
×
180
    type ObjectStore = AmazonS3;
×
181

×
182
    fn table_params(
×
183
        &self,
×
184
        table_name: &str,
×
185
    ) -> Result<DozerObjectStoreParams<Self::ObjectStore>, ConnectorError> {
×
186
        let table = get_table(&self.tables, table_name)?;
×
187
        let details = get_details(&self.details)?;
×
188

×
189
        let object_store = AmazonS3Builder::new()
×
190
            .with_bucket_name(&details.bucket_name)
×
191
            .with_region(&details.region)
×
192
            .with_access_key_id(&details.access_key_id)
×
193
            .with_secret_access_key(&details.secret_access_key)
×
194
            .build()
×
195
            .map_err(|_| ConnectorError::InitializationError)?;
×
196

×
197
        Ok(DozerObjectStoreParams {
×
198
            scheme: "s3",
×
199
            host: &details.bucket_name,
×
200
            object_store,
×
201
            table_path: format!("s3://{}/{}/", details.bucket_name, table.prefix),
×
202
            data_fusion_table: table,
×
203
        })
×
204
    }
×
205
}
×
206

×
207
impl DozerObjectStore for LocalStorage {
×
208
    type ObjectStore = LocalFileSystem;
×
209

×
210
    fn table_params(
×
211
        &self,
×
212
        table_name: &str,
×
213
    ) -> Result<DozerObjectStoreParams<Self::ObjectStore>, ConnectorError> {
×
214
        let table = get_table(&self.tables, table_name)?;
×
215
        let path = get_details(&self.details)?.path.as_str();
×
216

217
        let object_store = LocalFileSystem::new_with_prefix(path)
×
218
            .map_err(|_| ConnectorError::InitializationError)?;
×
219

×
220
        Ok(DozerObjectStoreParams {
×
221
            scheme: "local",
×
222
            host: path,
×
223
            object_store,
×
224
            table_path: format!("s3://{path}/{}/", table.prefix),
×
225
            data_fusion_table: table,
×
226
        })
×
227
    }
×
228
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc