• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5659039553

pending completion
5659039553

Pull #1792

github

chubei
chore: Change `make_from!` in `from_arrow` to func to improve readability
Pull Request #1792: chore: Change `make_from!` in `from_arrow` to func to improve readability

31 of 31 new or added lines in 4 files covered. (100.0%)

45385 of 58700 relevant lines covered (77.32%)

39368.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.62
/dozer-types/src/arrow_types/from_arrow.rs
1
use super::errors::FromArrowError;
2
use super::errors::FromArrowError::DateConversionError;
3
use super::errors::FromArrowError::DateTimeConversionError;
4
use super::errors::FromArrowError::DurationConversionError;
5
use super::errors::FromArrowError::FieldTypeNotSupported;
6
use super::errors::FromArrowError::TimeConversionError;
7
use super::to_arrow;
8
use crate::arrow_types::to_arrow::DOZER_SCHEMA_KEY;
9
use crate::json_types::JsonValue;
10
use crate::types::{
11
    Field as DozerField, FieldDefinition, FieldType, Record, Schema as DozerSchema, Schema,
12
    SourceDefinition,
13
};
14
use arrow::array;
15
use arrow::array::ArrayAccessor;
16
use arrow::array::{Array, ArrayRef};
17
use arrow::datatypes::{DataType, TimeUnit};
18
use arrow::ipc::writer::StreamWriter;
19
use arrow::record_batch::RecordBatch;
20
use arrow::row::SortField;
21

22
use crate::arrow_types::errors::FromArrowError::DeserializationError;
23
use log::error;
24
use std::str::FromStr;
25
use std::sync::Arc;
26

27
fn make_from<A: Array + 'static>(column: &Arc<dyn Array>, row: usize) -> DozerField
18,809✔
28
where
18,809✔
29
    for<'a> &'a A: ArrayAccessor,
18,809✔
30
    for<'a> DozerField: From<<&'a A as ArrayAccessor>::Item>,
18,809✔
31
{
18,809✔
32
    let array = column.as_any().downcast_ref::<A>();
18,809✔
33

34
    if let Some(r) = array {
18,809✔
35
        if r.is_null(row) {
18,809✔
36
            DozerField::Null
404✔
37
        } else {
38
            DozerField::from(r.value(row))
18,405✔
39
        }
40
    } else {
41
        DozerField::Null
×
42
    }
43
}
18,809✔
44

45
macro_rules! make_binary {
46
    ($array_type:ty, $column: ident, $row: ident) => {{
47
        let array = $column.as_any().downcast_ref::<$array_type>();
48

49
        if let Some(r) = array {
50
            let s: DozerField = if r.is_null($row.clone()) {
51
                DozerField::Null
52
            } else {
53
                DozerField::Binary(r.value($row.clone()).to_vec())
54
            };
55

56
            Ok(s)
57
        } else {
58
            Ok(DozerField::Null)
59
        }
60
    }};
61
}
62

63
macro_rules! make_timestamp {
64
    ($array_type:ty, $column: ident, $row: ident) => {{
65
        let array = $column.as_any().downcast_ref::<$array_type>();
66

67
        if let Some(r) = array {
68
            if r.is_null($row.clone()) {
69
                Ok(DozerField::Null)
70
            } else {
71
                r.value_as_datetime($row.clone())
72
                    .map(DozerField::from)
73
                    .map_or_else(|| Err(DateTimeConversionError), |v| Ok(DozerField::from(v)))
74
            }
75
        } else {
76
            Ok(DozerField::Null)
77
        }
78
    }};
79
}
80

81
macro_rules! make_date {
82
    ($array_type:ty, $column: ident, $row: ident) => {{
83
        let array = $column.as_any().downcast_ref::<$array_type>();
84

85
        if let Some(r) = array {
86
            if r.is_null($row.clone()) {
87
                Ok(DozerField::Null)
88
            } else {
89
                r.value_as_date($row.clone())
90
                    .map_or_else(|| Err(DateConversionError), |v| Ok(DozerField::from(v)))
91
            }
92
        } else {
93
            Ok(DozerField::Null)
94
        }
95
    }};
96
}
97

98
macro_rules! make_time {
99
    ($array_type:ty, $column: ident, $row: ident) => {{
100
        let array = $column.as_any().downcast_ref::<$array_type>();
101

102
        if let Some(r) = array {
103
            if r.is_null($row.clone()) {
104
                Ok(DozerField::Null)
105
            } else {
106
                r.value_as_time($row.clone())
107
                    .map_or_else(|| Err(TimeConversionError), |v| Ok(DozerField::from(v)))
108
            }
109
        } else {
110
            Ok(DozerField::Null)
111
        }
112
    }};
113
}
114

115
macro_rules! make_duration {
116
    ($array_type:ty, $column: ident, $row: ident) => {{
117
        let array = $column.as_any().downcast_ref::<$array_type>();
118

119
        if let Some(r) = array {
120
            if r.is_null($row.clone()) {
121
                Ok(DozerField::Null)
122
            } else {
123
                r.value_as_duration($row.clone()).map_or_else(
124
                    || Err(DurationConversionError),
125
                    |v| Ok(DozerField::from(v.num_nanoseconds().unwrap())),
126
                )
127
            }
128
        } else {
129
            Ok(DozerField::Null)
130
        }
131
    }};
132
}
133

134
macro_rules! make_text {
135
    ($array_type:ty, $column: ident, $row: ident) => {{
136
        let array = $column.as_any().downcast_ref::<$array_type>();
137

138
        if let Some(r) = array {
139
            let s: DozerField = if r.is_null($row.clone()) {
140
                DozerField::Null
141
            } else {
142
                DozerField::Text(r.value($row.clone()).to_string())
143
            };
144

145
            Ok(s)
146
        } else {
147
            Ok(DozerField::Null)
148
        }
149
    }};
150
}
151

152
fn make_json(column: &ArrayRef, row: usize) -> Result<DozerField, FromArrowError> {
41✔
153
    let array = column.as_any().downcast_ref::<array::StringArray>();
41✔
154

155
    return if let Some(r) = array {
41✔
156
        let s: DozerField = if r.is_null(row) {
41✔
157
            DozerField::Null
×
158
        } else {
159
            match JsonValue::from_str(r.value(row)) {
41✔
160
                Ok(j) => DozerField::Json(j),
41✔
161
                Err(e) => return Err(DeserializationError(e)),
×
162
            }
163
        };
164
        Ok(s)
41✔
165
    } else {
166
        Ok(DozerField::Null)
×
167
    };
168
}
41✔
169

170
pub fn map_schema_to_dozer(
186✔
171
    schema: &arrow::datatypes::Schema,
186✔
172
) -> Result<DozerSchema, FromArrowError> {
186✔
173
    match schema.metadata.get(DOZER_SCHEMA_KEY) {
186✔
174
        Some(schema_val) => match serde_json::from_str(schema_val.as_str()) {
55✔
175
            Ok(s) => Ok(s),
55✔
176
            Err(e) => {
×
177
                error!("Dozer schema deserialization error {}", e.to_string());
×
178
                handle_with_dozer_schema(schema)
×
179
            }
180
        },
181
        None => handle_with_dozer_schema(schema),
131✔
182
    }
183
}
186✔
184

185
fn handle_with_dozer_schema(
131✔
186
    schema: &arrow::datatypes::Schema,
131✔
187
) -> Result<DozerSchema, FromArrowError> {
131✔
188
    let mut fields = vec![];
131✔
189
    for field in schema.fields() {
1,550✔
190
        let typ = map_arrow_to_dozer_type(field.data_type())?;
1,550✔
191

192
        fields.push(FieldDefinition {
1,550✔
193
            name: field.name().clone(),
1,550✔
194
            typ,
1,550✔
195
            nullable: field.is_nullable(),
1,550✔
196
            source: SourceDefinition::Dynamic,
1,550✔
197
        });
1,550✔
198
    }
199

200
    Ok(DozerSchema {
131✔
201
        fields,
131✔
202
        primary_index: vec![],
131✔
203
    })
131✔
204
}
131✔
205

206
pub fn map_arrow_to_dozer_type(dt: &DataType) -> Result<FieldType, FromArrowError> {
1,550✔
207
    match dt {
1,550✔
208
        DataType::Boolean => Ok(FieldType::Boolean),
40✔
209
        DataType::Time32(_)
210
        | DataType::Time64(_)
211
        | DataType::Duration(_)
212
        | DataType::Interval(_)
213
        | DataType::Int8
214
        | DataType::Int16
215
        | DataType::Int32
216
        | DataType::Int64 => Ok(FieldType::Int),
430✔
217
        DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
218
            Ok(FieldType::UInt)
130✔
219
        }
220
        DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(FieldType::Float),
299✔
221
        DataType::Timestamp(_, _) => Ok(FieldType::Timestamp),
169✔
222
        DataType::Date32 | DataType::Date64 => Ok(FieldType::Date),
52✔
223
        DataType::Binary | DataType::FixedSizeBinary(_) | DataType::LargeBinary => {
224
            Ok(FieldType::Binary)
104✔
225
        }
226
        DataType::Utf8 => Ok(FieldType::String),
196✔
227
        DataType::LargeUtf8 => Ok(FieldType::Text),
130✔
228
        // DataType::List(_) => {}
229
        // DataType::FixedSizeList(_, _) => {}
230
        // DataType::LargeList(_) => {}
231
        // DataType::Struct(_) => {}
232
        // DataType::Union(_, _, _) => {}
233
        // DataType::Dictionary(_, _) => {}
234
        // DataType::Decimal128(_, _) => {}
235
        // DataType::Decimal256(_, _) => {}
236
        _ => Err(FieldTypeNotSupported(format!("{dt:?}"))),
×
237
    }
238
}
1,550✔
239

240
pub fn map_value_to_dozer_field(
241
    column: &ArrayRef,
242
    row: usize,
243
    column_name: &str,
244
    schema: &Schema,
245
) -> Result<DozerField, FromArrowError> {
246
    match column.data_type() {
20,678✔
247
        DataType::Null => Ok(DozerField::Null),
×
248
        DataType::Boolean => Ok(make_from::<array::BooleanArray>(column, row)),
158✔
249
        DataType::Int8 => Ok(make_from::<array::Int8Array>(column, row)),
52✔
250
        DataType::Int16 => Ok(make_from::<array::Int16Array>(column, row)),
52✔
251
        DataType::Int32 => Ok(make_from::<array::Int32Array>(column, row)),
559✔
252
        DataType::Int64 => Ok(make_from::<array::Int64Array>(column, row)),
4,331✔
253
        DataType::UInt8 => Ok(make_from::<array::UInt8Array>(column, row)),
52✔
254
        DataType::UInt16 => Ok(make_from::<array::UInt16Array>(column, row)),
52✔
255
        DataType::UInt32 => Ok(make_from::<array::UInt32Array>(column, row)),
52✔
256
        DataType::UInt64 => Ok(make_from::<array::UInt64Array>(column, row)),
80✔
257
        DataType::Float16 => Ok(make_from::<array::Float32Array>(column, row)),
×
258
        DataType::Float32 => Ok(make_from::<array::Float32Array>(column, row)),
156✔
259
        DataType::Float64 => Ok(make_from::<array::Float64Array>(column, row)),
1,783✔
260
        DataType::Timestamp(TimeUnit::Microsecond, _) => {
261
            make_timestamp!(array::TimestampMicrosecondArray, column, row)
91✔
262
        }
263
        DataType::Timestamp(TimeUnit::Millisecond, _) => {
264
            make_timestamp!(array::TimestampMillisecondArray, column, row)
91✔
265
        }
266
        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
267
            make_timestamp!(array::TimestampNanosecondArray, column, row)
823✔
268
        }
269
        DataType::Timestamp(TimeUnit::Second, _) => {
270
            make_timestamp!(array::TimestampSecondArray, column, row)
91✔
271
        }
272
        DataType::Date32 => make_date!(array::Date32Array, column, row),
91✔
273
        DataType::Date64 => make_date!(array::Date64Array, column, row),
95✔
274
        DataType::Time32(TimeUnit::Millisecond) => {
275
            make_time!(array::Time32MillisecondArray, column, row)
91✔
276
        }
277
        DataType::Time32(TimeUnit::Second) => make_time!(array::Time32SecondArray, column, row),
91✔
278
        DataType::Time64(TimeUnit::Microsecond) => {
279
            make_time!(array::Time64MicrosecondArray, column, row)
91✔
280
        }
281
        DataType::Time64(TimeUnit::Nanosecond) => {
282
            make_time!(array::Time64NanosecondArray, column, row)
91✔
283
        }
284
        DataType::Duration(TimeUnit::Microsecond) => {
285
            make_duration!(array::DurationMicrosecondArray, column, row)
×
286
        }
287
        DataType::Duration(TimeUnit::Millisecond) => {
288
            make_duration!(array::DurationMillisecondArray, column, row)
×
289
        }
290
        DataType::Duration(TimeUnit::Nanosecond) => {
291
            make_duration!(array::DurationNanosecondArray, column, row)
×
292
        }
293
        DataType::Duration(TimeUnit::Second) => {
294
            make_duration!(array::DurationSecondArray, column, row)
×
295
        }
296
        DataType::Binary => make_binary!(array::BinaryArray, column, row),
262✔
297
        DataType::FixedSizeBinary(_) => make_binary!(array::FixedSizeBinaryArray, column, row),
52✔
298
        DataType::LargeBinary => make_binary!(array::LargeBinaryArray, column, row),
52✔
299
        DataType::Utf8 => {
300
            for fd in schema.fields.clone().into_iter() {
57,536✔
301
                if fd.name == *column_name && fd.typ == FieldType::Json {
57,536✔
302
                    return make_json(column, row);
41✔
303
                }
57,495✔
304
            }
305
            Ok(make_from::<array::StringArray>(column, row))
11,482✔
306
        }
307
        DataType::LargeUtf8 => make_text!(array::LargeStringArray, column, row),
574✔
308
        // DataType::Interval(TimeUnit::) => make_from!(array::BooleanArray, x, x0),
309
        // DataType::List(_) => {}
310
        // DataType::FixedSizeList(_, _) => {}
311
        // DataType::LargeList(_) => {}
312
        // DataType::Struct(_) => {}
313
        // DataType::Union(_, _, _) => {}
314
        // DataType::Dictionary(_, _) => {}
315
        // DataType::Decimal128(_, _) => {}
316
        // DataType::Decimal256(_, _) => {}
317
        // DataType::Map(_, _) => {}
318
        _ => Err(FieldTypeNotSupported(column_name.to_string())),
×
319
    }
320
}
20,678✔
321

322
pub fn map_record_batch_to_dozer_records(
14✔
323
    batch: arrow::record_batch::RecordBatch,
14✔
324
    schema: &DozerSchema,
14✔
325
) -> Result<Vec<Record>, FromArrowError> {
14✔
326
    if schema.fields.len() != batch.num_columns() {
14✔
327
        return Err(FromArrowError::SchemaMismatchError(
×
328
            schema.fields.len(),
×
329
            batch.num_columns(),
×
330
        ));
×
331
    }
14✔
332
    let mut records = Vec::new();
14✔
333
    let columns = batch.columns();
14✔
334
    let batch_schema = batch.schema();
14✔
335
    let dozer_schema = map_schema_to_dozer(&batch_schema)?;
14✔
336
    let mut sort_fields = vec![];
14✔
337
    for x in schema.fields.iter() {
60✔
338
        let dt = to_arrow::map_field_type(x.typ);
60✔
339
        sort_fields.push(SortField::new(dt));
60✔
340
    }
60✔
341
    let num_rows = batch.num_rows();
14✔
342

343
    for r in 0..num_rows {
40✔
344
        let mut values = vec![];
40✔
345
        for (c, x) in columns.iter().enumerate() {
138✔
346
            let field = schema.fields.get(c).unwrap();
138✔
347
            let value = map_value_to_dozer_field(x, r, &field.name, &dozer_schema)?;
138✔
348
            values.push(value);
138✔
349
        }
350
        records.push(Record {
40✔
351
            values,
40✔
352
            lifetime: None,
40✔
353
        });
40✔
354
    }
355

356
    Ok(records)
14✔
357
}
14✔
358

359
pub fn serialize_record_batch(record: &RecordBatch) -> Vec<u8> {
13✔
360
    let buffer: Vec<u8> = Vec::new();
13✔
361
    let mut stream_writer = StreamWriter::try_new(buffer, &record.schema()).unwrap();
13✔
362
    stream_writer.write(record).unwrap();
13✔
363
    stream_writer.finish().unwrap();
13✔
364
    stream_writer.into_inner().unwrap()
13✔
365
}
13✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc