• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4829268814

pending completion
4829268814

Pull #1516

github

GitHub
Merge 845b68ec1 into f2ab0e6ce
Pull Request #1516: Prepare v0.1.19

35090 of 44737 relevant lines covered (78.44%)

11496.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.98
/dozer-ingestion/src/connectors/kafka/debezium/schema.rs
1
use dozer_types::serde_json::Value;
2
use std::collections::HashMap;
3

4
use crate::connectors::kafka::debezium::stream_consumer::DebeziumSchemaStruct;
5

6
use crate::errors::DebeziumSchemaError;
7
use crate::errors::DebeziumSchemaError::{SchemaDefinitionNotFound, TypeNotSupported};
8
use dozer_types::types::{FieldDefinition, FieldType, Schema, SchemaIdentifier, SourceDefinition};
9

10
// Reference: https://debezium.io/documentation/reference/0.9/connectors/postgresql.html
11
pub fn map_type(schema: &DebeziumSchemaStruct) -> Result<FieldType, DebeziumSchemaError> {
13✔
12
    match schema.name.clone() {
13✔
13
        None => match schema.r#type.clone() {
8✔
14
            Value::String(typ) => match typ.as_str() {
8✔
15
                "int" | "int8" | "int16" | "int32" | "int64" => Ok(FieldType::Int),
8✔
16
                "string" => Ok(FieldType::String),
6✔
17
                "bytes" => Ok(FieldType::Binary),
4✔
18
                "float32" | "float64" | "double" => Ok(FieldType::Float),
3✔
19
                "boolean" => Ok(FieldType::Boolean),
2✔
20
                _ => Err(TypeNotSupported(typ)),
1✔
21
            },
22
            _ => Err(TypeNotSupported("Unexpected value type".to_string())),
×
23
        },
24
        Some(name) => match name.as_str() {
5✔
25
            "io.debezium.time.MicroTime"
5✔
26
            | "io.debezium.time.Timestamp"
4✔
27
            | "io.debezium.time.MicroTimestamp"
4✔
28
            | "org.apache.kafka.connect.data.Time"
4✔
29
            | "org.apache.kafka.connect.data.Timestamp" => Ok(FieldType::Timestamp),
4✔
30
            "io.debezium.time.Date" | "org.apache.kafka.connect.data.Date" => Ok(FieldType::Date),
4✔
31
            "org.apache.kafka.connect.data.Decimal" | "io.debezium.data.VariableScaleDecimal" => {
3✔
32
                Ok(FieldType::Decimal)
1✔
33
            }
34
            "io.debezium.data.Json" => Ok(FieldType::Json),
2✔
35
            _ => Err(TypeNotSupported(name)),
1✔
36
        },
37
    }
38
}
13✔
39

40
pub fn map_schema<'a>(
2✔
41
    schema: &'a DebeziumSchemaStruct,
2✔
42
    key_schema: &'a DebeziumSchemaStruct,
2✔
43
) -> Result<(Schema, HashMap<String, &'a DebeziumSchemaStruct>), DebeziumSchemaError> {
2✔
44
    let pk_fields = match &key_schema.fields {
2✔
45
        None => vec![],
×
46
        Some(fields) => fields.iter().map(|f| f.field.clone().unwrap()).collect(),
2✔
47
    };
48

49
    match &schema.fields {
2✔
50
        None => Err(SchemaDefinitionNotFound),
×
51
        Some(fields) => {
2✔
52
            let new_schema_struct = fields.iter().find(|f| {
2✔
53
                if let Some(val) = f.field.clone() {
2✔
54
                    val == *"after"
2✔
55
                } else {
56
                    false
×
57
                }
58
            });
2✔
59

60
            if let Some(schema) = new_schema_struct {
2✔
61
                let mut pk_keys_indexes = vec![];
2✔
62
                let mut fields_schema_map: HashMap<String, &DebeziumSchemaStruct> = HashMap::new();
2✔
63

64
                let defined_fields: Result<Vec<FieldDefinition>, _> = match &schema.fields {
2✔
65
                    None => Ok(vec![]),
1✔
66
                    Some(fields) => fields
1✔
67
                        .iter()
1✔
68
                        .enumerate()
1✔
69
                        .map(|(idx, f)| {
1✔
70
                            let typ = map_type(f)?;
2✔
71
                            let name = f.field.clone().unwrap();
2✔
72
                            if pk_fields.contains(&name) {
2✔
73
                                pk_keys_indexes.push(idx);
1✔
74
                            }
1✔
75
                            fields_schema_map.insert(name.clone(), f);
2✔
76
                            Ok(FieldDefinition {
2✔
77
                                name,
2✔
78
                                typ,
2✔
79
                                nullable: f.optional.map_or(false, |o| o),
2✔
80
                                source: SourceDefinition::Dynamic,
2✔
81
                            })
2✔
82
                        })
2✔
83
                        .collect(),
1✔
84
                };
85

86
                Ok((
87
                    Schema {
88
                        identifier: Some(SchemaIdentifier { id: 1, version: 1 }),
2✔
89
                        fields: defined_fields?,
2✔
90
                        primary_index: pk_keys_indexes,
2✔
91
                    },
2✔
92
                    fields_schema_map,
2✔
93
                ))
94
            } else {
95
                Err(SchemaDefinitionNotFound)
×
96
            }
97
        }
98
    }
99
}
2✔
100

101
#[cfg(test)]
102
mod tests {
103
    use crate::connectors::kafka::debezium::schema::{map_schema, map_type};
104
    use crate::connectors::kafka::debezium::stream_consumer::DebeziumSchemaStruct;
105
    use crate::errors::DebeziumSchemaError::SchemaDefinitionNotFound;
106
    use crate::errors::DebeziumSchemaError::TypeNotSupported;
107
    use dozer_types::serde_json::Value;
108
    use dozer_types::types::{
109
        FieldDefinition, FieldType, Schema, SchemaIdentifier, SourceDefinition,
110
    };
111

112
    #[test]
1✔
113
    fn test_it_fails_when_schema_empty() {
1✔
114
        let schema = DebeziumSchemaStruct {
1✔
115
            r#type: Value::String("empty".to_string()),
1✔
116
            fields: None,
1✔
117
            optional: Some(false),
1✔
118
            name: None,
1✔
119
            field: None,
1✔
120
            version: None,
1✔
121
            parameters: None,
1✔
122
        };
1✔
123

1✔
124
        let key_schema = DebeziumSchemaStruct {
1✔
125
            r#type: Value::String("before".to_string()),
1✔
126
            fields: None,
1✔
127
            optional: Some(false),
1✔
128
            name: None,
1✔
129
            field: None,
1✔
130
            version: None,
1✔
131
            parameters: None,
1✔
132
        };
1✔
133

1✔
134
        let actual_error = map_schema(&schema, &key_schema).unwrap_err();
1✔
135
        assert_eq!(actual_error, SchemaDefinitionNotFound);
1✔
136
    }
1✔
137

138
    #[test]
1✔
139
    fn test_it_converts_schema() {
1✔
140
        let schema = DebeziumSchemaStruct {
1✔
141
            r#type: Value::String("empty".to_string()),
1✔
142
            fields: Some(vec![DebeziumSchemaStruct {
1✔
143
                r#type: Value::String("after".to_string()),
1✔
144
                fields: Some(vec![
1✔
145
                    DebeziumSchemaStruct {
1✔
146
                        r#type: Value::String("int32".to_string()),
1✔
147
                        fields: None,
1✔
148
                        optional: Some(false),
1✔
149
                        name: None,
1✔
150
                        field: Some("id".to_string()),
1✔
151
                        version: None,
1✔
152
                        parameters: None,
1✔
153
                    },
1✔
154
                    DebeziumSchemaStruct {
1✔
155
                        r#type: Value::String("string".to_string()),
1✔
156
                        fields: None,
1✔
157
                        optional: Some(true),
1✔
158
                        name: None,
1✔
159
                        field: Some("name".to_string()),
1✔
160
                        version: None,
1✔
161
                        parameters: None,
1✔
162
                    },
1✔
163
                ]),
1✔
164
                optional: Some(false),
1✔
165
                name: None,
1✔
166
                field: Some("after".to_string()),
1✔
167
                version: None,
1✔
168
                parameters: None,
1✔
169
            }]),
1✔
170
            optional: Some(false),
1✔
171
            name: None,
1✔
172
            field: Some("struct".to_string()),
1✔
173
            version: None,
1✔
174
            parameters: None,
1✔
175
        };
1✔
176

1✔
177
        let key_schema = DebeziumSchemaStruct {
1✔
178
            r#type: Value::String("-".to_string()),
1✔
179
            fields: Some(vec![DebeziumSchemaStruct {
1✔
180
                r#type: Value::String("int32".to_string()),
1✔
181
                fields: None,
1✔
182
                optional: Some(false),
1✔
183
                name: None,
1✔
184
                field: Some("id".to_string()),
1✔
185
                version: None,
1✔
186
                parameters: None,
1✔
187
            }]),
1✔
188
            optional: Some(false),
1✔
189
            name: None,
1✔
190
            field: None,
1✔
191
            version: None,
1✔
192
            parameters: None,
1✔
193
        };
1✔
194

1✔
195
        let (schema, _) = map_schema(&schema, &key_schema).unwrap();
1✔
196
        let expected_schema = Schema {
1✔
197
            identifier: Some(SchemaIdentifier { id: 1, version: 1 }),
1✔
198
            fields: vec![
1✔
199
                FieldDefinition {
1✔
200
                    name: "id".to_string(),
1✔
201
                    typ: FieldType::Int,
1✔
202
                    nullable: false,
1✔
203
                    source: SourceDefinition::Dynamic,
1✔
204
                },
1✔
205
                FieldDefinition {
1✔
206
                    name: "name".to_string(),
1✔
207
                    typ: FieldType::String,
1✔
208
                    nullable: true,
1✔
209
                    source: SourceDefinition::Dynamic,
1✔
210
                },
1✔
211
            ],
1✔
212
            primary_index: vec![0],
1✔
213
        };
1✔
214
        assert_eq!(schema, expected_schema);
1✔
215
    }
1✔
216

217
    #[test]
1✔
218
    fn test_it_converts_empty_schema() {
1✔
219
        let schema = DebeziumSchemaStruct {
1✔
220
            r#type: Value::String("empty".to_string()),
1✔
221
            fields: Some(vec![DebeziumSchemaStruct {
1✔
222
                r#type: Value::String("after".to_string()),
1✔
223
                fields: None,
1✔
224
                optional: Some(false),
1✔
225
                name: None,
1✔
226
                field: Some("after".to_string()),
1✔
227
                version: None,
1✔
228
                parameters: None,
1✔
229
            }]),
1✔
230
            optional: Some(false),
1✔
231
            name: None,
1✔
232
            field: Some("struct".to_string()),
1✔
233
            version: None,
1✔
234
            parameters: None,
1✔
235
        };
1✔
236

1✔
237
        let key_schema = DebeziumSchemaStruct {
1✔
238
            r#type: Value::String("-".to_string()),
1✔
239
            fields: Some(vec![]),
1✔
240
            optional: Some(false),
1✔
241
            name: None,
1✔
242
            field: None,
1✔
243
            version: None,
1✔
244
            parameters: None,
1✔
245
        };
1✔
246

1✔
247
        let (schema, _) = map_schema(&schema, &key_schema).unwrap();
1✔
248
        let expected_schema = Schema {
1✔
249
            identifier: Some(SchemaIdentifier { id: 1, version: 1 }),
1✔
250
            fields: vec![],
1✔
251
            primary_index: vec![],
1✔
252
        };
1✔
253
        assert_eq!(schema, expected_schema);
1✔
254
    }
1✔
255

256
    macro_rules! test_map_type {
257
        ($a:expr,$b:expr,$c:expr) => {
258
            let schema = DebeziumSchemaStruct {
259
                r#type: Value::String($a.to_string()),
260
                fields: None,
261
                optional: Some(false),
262
                name: $b,
263
                field: None,
264
                version: None,
265
                parameters: None,
266
            };
267

268
            let typ = map_type(&schema);
269
            assert_eq!(typ, $c);
270
        };
271
    }
272

273
    #[test]
1✔
274
    fn test_map_type() {
1✔
275
        test_map_type!("int8", None, Ok(FieldType::Int));
1✔
276
        test_map_type!("string", None, Ok(FieldType::String));
1✔
277
        test_map_type!("bytes", None, Ok(FieldType::Binary));
1✔
278
        test_map_type!("float32", None, Ok(FieldType::Float));
1✔
279
        test_map_type!("boolean", None, Ok(FieldType::Boolean));
1✔
280
        test_map_type!(
1✔
281
            "not found",
1✔
282
            None,
1✔
283
            Err(TypeNotSupported("not found".to_string()))
1✔
284
        );
1✔
285
        test_map_type!(
1✔
286
            "int8",
1✔
287
            Some("io.debezium.time.MicroTime".to_string()),
1✔
288
            Ok(FieldType::Timestamp)
1✔
289
        );
1✔
290
        test_map_type!(
1✔
291
            "int8",
1✔
292
            Some("io.debezium.time.Date".to_string()),
1✔
293
            Ok(FieldType::Date)
1✔
294
        );
1✔
295
        test_map_type!(
1✔
296
            "int8",
1✔
297
            Some("org.apache.kafka.connect.data.Decimal".to_string()),
1✔
298
            Ok(FieldType::Decimal)
1✔
299
        );
1✔
300
        test_map_type!(
1✔
301
            "string",
1✔
302
            Some("io.debezium.data.Json".to_string()),
1✔
303
            Ok(FieldType::Json)
1✔
304
        );
1✔
305
        test_map_type!(
1✔
306
            "string",
1✔
307
            Some("not existing".to_string()),
1✔
308
            Err(TypeNotSupported("not existing".to_string()))
1✔
309
        );
1✔
310
    }
1✔
311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc