• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4829321272

pending completion
4829321272

Pull #1515

github

GitHub
Merge b6d982211 into f2ab0e6ce
Pull Request #1515: feat: Run migration only when necessary

193 of 193 new or added lines in 11 files covered. (100.0%)

35565 of 45252 relevant lines covered (78.59%)

16462.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.29
/dozer-orchestrator/src/simple/migration.rs
1
use dozer_api::generator::protoc::generator::ProtoGenerator;
2
use dozer_cache::dozer_log::{
3
    home_dir::{HomeDir, MigrationId},
4
    schemas::{load_schema, write_schema, MigrationSchema},
5
};
6
use dozer_types::{
7
    models::api_endpoint::{ApiEndpoint, ApiIndex},
8
    types::{Schema, SchemaIdentifier},
9
};
10

11
use crate::errors::MigrationError;
12

13
pub fn needs_migration(
6✔
14
    home_dir: &HomeDir,
6✔
15
    endpoint_name: &str,
6✔
16
    schema: &MigrationSchema,
6✔
17
) -> Result<Option<MigrationId>, MigrationError> {
6✔
18
    let migration_path = home_dir
6✔
19
        .find_latest_migration_path(endpoint_name)
6✔
20
        .map_err(|(path, error)| MigrationError::FileSystem(path, error))?;
6✔
21
    let Some(migration_path) = migration_path else {
6✔
22
        return Ok(Some(MigrationId::first()));
6✔
23
    };
24

25
    if migration_path.log_path.exists() {
×
26
        return Ok(Some(migration_path.id.next()));
×
27
    }
×
28

29
    let existing_schema = load_schema(&migration_path.schema_path)
×
30
        .map_err(MigrationError::CannotLoadExistingSchema)?;
×
31
    if existing_schema == *schema {
×
32
        Ok(None)
×
33
    } else {
34
        Ok(Some(migration_path.id.next()))
×
35
    }
36
}
6✔
37

38
pub fn create_migration(
6✔
39
    home_dir: &HomeDir,
6✔
40
    endpoint_name: &str,
6✔
41
    migration_id: MigrationId,
6✔
42
    schema: &MigrationSchema,
6✔
43
) -> Result<(), MigrationError> {
6✔
44
    let migration_path = home_dir
6✔
45
        .create_migration_dir_all(endpoint_name, migration_id)
6✔
46
        .map_err(|(path, error)| MigrationError::FileSystem(path, error))?;
6✔
47

48
    write_schema(schema, &migration_path.schema_path).map_err(MigrationError::CannotWriteSchema)?;
6✔
49

50
    let proto_folder_path = &migration_path.api_dir;
6✔
51
    ProtoGenerator::generate(proto_folder_path, endpoint_name, schema)?;
6✔
52

53
    let mut resources = Vec::new();
6✔
54
    resources.push(endpoint_name);
6✔
55

56
    let common_resources = ProtoGenerator::copy_common(proto_folder_path)?;
6✔
57

58
    // Copy common service to be included in descriptor.
59
    resources.extend(common_resources.iter().map(|str| str.as_str()));
24✔
60

6✔
61
    // Generate a descriptor based on all proto files generated within sink.
6✔
62
    ProtoGenerator::generate_descriptor(
6✔
63
        proto_folder_path,
6✔
64
        &migration_path.descriptor_path,
6✔
65
        &resources,
6✔
66
    )?;
6✔
67

68
    Ok(())
6✔
69
}
6✔
70

71
pub fn modify_schema(
6✔
72
    schema: &Schema,
6✔
73
    api_endpoint: &ApiEndpoint,
6✔
74
) -> Result<Schema, MigrationError> {
6✔
75
    let mut schema = schema.clone();
6✔
76
    // Generated Cache index based on api_index
77
    let configured_index =
6✔
78
        create_primary_indexes(&schema, &api_endpoint.index.to_owned().unwrap_or_default())?;
6✔
79
    // Generated schema in SQL
80
    let upstream_index = schema.primary_index.clone();
6✔
81

82
    let index = match (configured_index.is_empty(), upstream_index.is_empty()) {
6✔
83
        (true, true) => vec![],
6✔
84
        (true, false) => upstream_index,
×
85
        (false, true) => configured_index,
×
86
        (false, false) => {
87
            if !upstream_index.eq(&configured_index) {
×
88
                return Err(MigrationError::MismatchPrimaryKey {
×
89
                    endpoint_name: api_endpoint.name.clone(),
×
90
                    expected: get_field_names(&schema, &upstream_index),
×
91
                    actual: get_field_names(&schema, &configured_index),
×
92
                });
×
93
            }
×
94
            configured_index
×
95
        }
96
    };
97

98
    schema.primary_index = index;
6✔
99

6✔
100
    schema.identifier = Some(SchemaIdentifier { id: 0, version: 1 });
6✔
101
    Ok(schema)
6✔
102
}
6✔
103

104
fn get_field_names(schema: &Schema, indexes: &[usize]) -> Vec<String> {
×
105
    indexes
×
106
        .iter()
×
107
        .map(|idx| schema.fields[*idx].name.to_owned())
×
108
        .collect()
×
109
}
×
110

111
fn create_primary_indexes(
6✔
112
    schema: &Schema,
6✔
113
    api_index: &ApiIndex,
6✔
114
) -> Result<Vec<usize>, MigrationError> {
6✔
115
    let mut primary_index = Vec::new();
6✔
116
    for name in api_index.primary_key.iter() {
6✔
117
        let idx = schema
×
118
            .fields
×
119
            .iter()
×
120
            .position(|fd: &dozer_types::types::FieldDefinition| fd.name == name.clone())
×
121
            .map_or(Err(MigrationError::FieldNotFound(name.to_owned())), Ok)?;
×
122

123
        primary_index.push(idx);
×
124
    }
125
    Ok(primary_index)
6✔
126
}
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc