• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5659039553

pending completion
5659039553

Pull #1792

github

chubei
chore: Change `make_from!` in `from_arrow` to func to improve readability
Pull Request #1792: chore: Change `make_from!` in `from_arrow` to func to improve readability

31 of 31 new or added lines in 4 files covered. (100.0%)

45385 of 58700 relevant lines covered (77.32%)

39368.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/object_store/delta/delta_table.rs
1
use std::{collections::HashMap, sync::Arc};
2

3
use deltalake::{datafusion::prelude::SessionContext, s3_storage_options};
4
use dozer_types::chrono::{DateTime, Utc};
5
use dozer_types::ingestion_types::IngestionMessageKind;
6
use dozer_types::{
7
    arrow_types::from_arrow::{map_schema_to_dozer, map_value_to_dozer_field},
8
    ingestion_types::DeltaConfig,
9
    tracing::error,
10
    types::{Operation, Record},
11
};
12
use futures::StreamExt;
13
use tokio::sync::mpsc::Sender;
14
use tokio::task::JoinHandle;
15
use tonic::async_trait;
16

17
use crate::{
18
    connectors::{
19
        object_store::{adapters::DozerObjectStore, table_watcher::TableWatcher},
20
        TableInfo,
21
    },
22
    errors::{ConnectorError, ObjectStoreConnectorError},
23
};
24

25
pub struct DeltaTable<T: DozerObjectStore + Send> {
26
    _table_config: DeltaConfig,
27
    store_config: T,
28
}
29

30
impl<T: DozerObjectStore + Send> DeltaTable<T> {
31
    pub fn new(table_config: DeltaConfig, store_config: T) -> Self {
×
32
        Self {
×
33
            _table_config: table_config,
×
34
            store_config,
×
35
        }
×
36
    }
×
37
}
38

39
#[async_trait]
40
impl<T: DozerObjectStore + Send> TableWatcher for DeltaTable<T> {
41
    // async fn watch(
42
    //     &self,
43
    //     id: usize,
44
    //     table: &TableInfo,
45
    //     config: &impl DozerObjectStore,
46
    //     ingestor: &Ingestor,
47
    // ) -> Result<(), ConnectorError> {
48
    //     let (tx, mut rx) = channel(16);
49

50
    //     let params = config.table_params(&table.name)?;
51

52
    //     let data = self.snapshot_data(table, config).await?;
53
    //     tokio::pin!(data);
54

55
    //     tx.send(Some(IngestionMessage::new_snapshotting_started(0_u64, 0)))
56
    //         .await
57
    //         .map_err(|e| ConnectorError::InternalError(Box::new(e)))?;
58

59
    //     let mut seq_no = 1;
60
    //     while let Some(Ok(batch)) = data.next().await {
61
    //         let dozer_schema = map_schema_to_dozer(&batch.schema())
62
    //             .map_err(|e| ConnectorError::InternalError(Box::new(e)))
63
    //             .unwrap();
64
    //         for row in 0..batch.num_rows() {
65
    //             let fields = batch
66
    //                 .columns()
67
    //                 .iter()
68
    //                 .enumerate()
69
    //                 .map(|(col, column)| {
70
    //                     map_value_to_dozer_field(
71
    //                         column,
72
    //                         &row,
73
    //                         dozer_schema.fields.get(col).unwrap().name.as_str(),
74
    //                         &dozer_schema,
75
    //                     )
76
    //                     .unwrap()
77
    //                 })
78
    //                 .collect::<Vec<_>>();
79

80
    //             tx.send(Some(IngestionMessage::new_op(
81
    //                 0,
82
    //                 seq_no,
83
    //                 Operation::Insert {
84
    //                     new: Record {
85
    //                         schema_id: Some(SchemaIdentifier {
86
    //                             id: id as u32,
87
    //                             version: 0,
88
    //                         }),
89
    //                         values: fields,
90
    //                         lifetime: None,
91
    //                     },
92
    //                 },
93
    //             )))
94
    //             .await
95
    //             .map_err(|e| ConnectorError::InternalError(Box::new(e)))?;
96
    //             seq_no += 1;
97
    //         }
98
    //     }
99

100
    //     tx.send(Some(IngestionMessage::new_snapshotting_done(0, seq_no)))
101
    //         .await
102
    //         .map_err(|e| ConnectorError::InternalError(Box::new(e)))?;
103
    //     seq_no += 1;
104

105
    //     loop {
106
    //         let maybe_message = rx
107
    //             .recv()
108
    //             .await
109
    //             .ok_or(ConnectorError::ObjectStoreConnectorError(RecvError))?;
110
    //         match maybe_message {
111
    //             None => {
112
    //                 break;
113
    //             }
114
    //             Some(message) => {
115
    //                 ingestor
116
    //                     .handle_message(message)
117
    //                     .map_err(ConnectorError::IngestorError)?;
118
    //             }
119
    //         }
120
    //     }
121
    //     Ok(())
122
    // }
123

124
    async fn snapshot(
×
125
        &self,
×
126
        table_index: usize,
×
127
        table: &TableInfo,
×
128
        sender: Sender<Result<Option<IngestionMessageKind>, ObjectStoreConnectorError>>,
×
129
    ) -> Result<JoinHandle<(usize, HashMap<object_store::path::Path, DateTime<Utc>>)>, ConnectorError>
×
130
    {
×
131
        let params = self.store_config.table_params(&table.name)?;
×
132

133
        let ctx = SessionContext::new();
×
134

135
        let delta_table = if params.aws_region.is_none() {
×
136
            deltalake::open_table(&params.table_path).await.unwrap()
×
137
        } else {
138
            let storage_options = HashMap::from([
×
139
                (
×
140
                    s3_storage_options::AWS_REGION.to_string(),
×
141
                    params.aws_region.clone().unwrap(),
×
142
                ),
×
143
                (
×
144
                    s3_storage_options::AWS_ACCESS_KEY_ID.to_string(),
×
145
                    params.aws_access_key_id.clone().unwrap(),
×
146
                ),
×
147
                (
×
148
                    s3_storage_options::AWS_SECRET_ACCESS_KEY.to_string(),
×
149
                    params.aws_secret_access_key.clone().unwrap(),
×
150
                ),
×
151
            ]);
×
152

×
153
            deltalake::open_table_with_storage_options(&params.table_path, storage_options)
×
154
                .await
×
155
                .unwrap()
×
156
        };
157

158
        let h = tokio::spawn(async move {
×
159
            let data = ctx
×
160
                .read_table(Arc::new(delta_table))
×
161
                .unwrap()
×
162
                //.select_columns(&cols)?
×
163
                .execute_stream()
×
164
                .await
×
165
                .unwrap();
×
166

×
167
            // let (_, data) = DeltaOps(delta_table).load().await?;
×
168

×
169
            tokio::pin!(data);
×
170

171
            // self.ingestor
172
            //     .handle_message(IngestionMessage::new_snapshotting_started(0_u64, 0))
173
            //     .map_err(ConnectorError::IngestorError)?;
174

175
            while let Some(Ok(batch)) = data.next().await {
×
176
                let dozer_schema = map_schema_to_dozer(&batch.schema())
×
177
                    .map_err(|e| ConnectorError::InternalError(Box::new(e)))
×
178
                    .unwrap();
×
179
                for row in 0..batch.num_rows() {
×
180
                    let fields = batch
×
181
                        .columns()
×
182
                        .iter()
×
183
                        .enumerate()
×
184
                        .map(|(col, column)| {
×
185
                            map_value_to_dozer_field(
×
186
                                column,
×
187
                                row,
×
188
                                dozer_schema.fields.get(col).unwrap().name.as_str(),
×
189
                                &dozer_schema,
×
190
                            )
×
191
                            .unwrap()
×
192
                        })
×
193
                        .collect::<Vec<_>>();
×
194

×
195
                    let evt = Operation::Insert {
×
196
                        new: Record {
×
197
                            values: fields,
×
198
                            lifetime: None,
×
199
                        },
×
200
                    };
×
201

202
                    if let Err(e) = sender
×
203
                        .send(Ok(Some(IngestionMessageKind::OperationEvent {
×
204
                            table_index,
×
205
                            op: evt,
×
206
                        })))
×
207
                        .await
×
208
                    {
209
                        error!("Failed to send ingestion message: {}", e);
×
210
                    }
×
211

212
                    // self.ingestor
213
                    //     .handle_message(IngestionMessage::new_op(
214
                    //         0,
215
                    //         seq_no,
216
                    //         Operation::Insert {
217
                    //             new: Record {
218
                    //                 schema_id: Some(SchemaIdentifier {
219
                    //                     id: id as u32,
220
                    //                     version: 0,
221
                    //                 }),
222
                    //                 values: fields,
223
                    //                 lifetime: None,
224
                    //             },
225
                    //         },
226
                    //     ))
227
                    //     .map_err(ConnectorError::IngestorError)?;
228
                }
229
            }
230
            (table_index, HashMap::new())
×
231
        });
×
232

×
233
        // self.ingestor
×
234
        //     .handle_message(IngestionMessage::new_snapshotting_done(0, seq_no))
×
235
        //     .map_err(ConnectorError::IngestorError)?;
×
236

×
237
        Ok(h)
×
238
    }
×
239

240
    async fn ingest(
×
241
        &self,
×
242
        _table_index: usize,
×
243
        _table: &TableInfo,
×
244
        _sender: Sender<Result<Option<IngestionMessageKind>, ObjectStoreConnectorError>>,
×
245
    ) -> Result<(), ConnectorError> {
×
246
        Ok(())
×
247
    }
×
248
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc