• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

divviup / divviup-api / 11293827300

11 Oct 2024 02:06PM UTC coverage: 55.932% (+0.02%) from 55.917%
11293827300

Pull #1339

github

web-flow
Merge 4c9413d7f into 7cf458010
Pull Request #1339: Bump primereact from 10.8.3 to 10.8.4 in /app

3932 of 7030 relevant lines covered (55.93%)

103.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.44
/migration/src/bin/migrate_to.rs
1
use clap::{builder::PossibleValuesParser, command, Parser, ValueEnum};
2
use sea_orm_migration::{
3
    sea_orm::{ConnectOptions, Database, DatabaseConnection, DbErr, EntityTrait, QueryOrder},
4
    seaql_migrations, MigratorTrait, SchemaManager,
5
};
6
use std::cmp::Ordering;
7
use tracing::{error, info};
8
use tracing_subscriber::EnvFilter;
9

10
use migration::Migrator;
11

12
#[derive(Copy, Clone, ValueEnum, Debug)]
13
enum Direction {
14
    Up,
15
    Down,
16
}
17

18
/// Wraps the SeaORM migration library to provide additional features. Lets
19
/// you bring migrations up or down to a particular version. Allows dry-run
20
/// of migrations.
21
#[derive(Parser, Debug)]
22
#[command(about, version)]
23
struct Args {
24
    #[arg(value_enum)]
25
    direction: Direction,
×
26
    #[arg(value_parser = available_migrations())]
27
    target_version: String,
×
28
    #[arg(short, long)]
29
    dry_run: bool,
×
30
    #[arg(short = 'u', long, env = "DATABASE_URL")]
31
    database_url: String,
×
32
}
33

34
#[async_std::main]
×
35
async fn main() -> Result<(), Error> {
36
    let args = Args::parse();
37
    tracing_subscriber::fmt()
38
        .with_env_filter(EnvFilter::from_default_env())
39
        .with_ansi(false)
40
        .init();
41

42
    let db = Database::connect(ConnectOptions::new(args.database_url)).await?;
43
    check_database_is_compatible::<Migrator>(&db).await?;
44
    match args.direction {
45
        Direction::Up => migrate_up::<Migrator>(&db, args.dry_run, &args.target_version).await?,
46
        Direction::Down => {
47
            migrate_down::<Migrator>(&db, args.dry_run, &args.target_version).await?
48
        }
49
    }
50
    Ok(())
51
}
52

53
/// Checks that the database is compatible with the given migrator. Applied
54
/// migrations must be a subset of the migrations available by the MigratorTrait
55
/// for this CLI to be operated safely.
56
async fn check_database_is_compatible<M: MigratorTrait>(
7✔
57
    db: &DatabaseConnection,
7✔
58
) -> Result<(), Error> {
7✔
59
    // If the database is uninitialized, we can continue. The migrator will
7✔
60
    // initialize the database for us.
7✔
61
    if !has_migrations_table(db).await? {
17✔
62
        return Ok(());
1✔
63
    }
6✔
64

6✔
65
    let migrations: Vec<String> = M::migrations().iter().map(|m| m.name().into()).collect();
27✔
66
    let applied_migrations = applied_migrations(db).await?;
22✔
67

68
    let print_error = || {
6✔
69
        error!("expected migrations: {:?}", migrations);
4✔
70
        error!("present migrations: {:?}", applied_migrations);
4✔
71
    };
4✔
72
    for (i, applied) in applied_migrations.iter().enumerate() {
19✔
73
        match migrations.get(i) {
19✔
74
            Some(migration) if migration != applied => {
18✔
75
                print_error();
3✔
76
                return Err(Error::DbNotCompatible);
3✔
77
            }
78
            None => {
79
                print_error();
1✔
80
                return Err(Error::DbNotCompatible);
1✔
81
            }
82
            _ => {}
15✔
83
        }
84
    }
85
    Ok(())
2✔
86
}
7✔
87

88
async fn migrate_up<M: MigratorTrait>(
15✔
89
    db: &DatabaseConnection,
15✔
90
    dry_run: bool,
15✔
91
    target: &str,
15✔
92
) -> Result<(), Error> {
15✔
93
    let target_index =
14✔
94
        migration_index::<M>(target).ok_or(Error::MigrationNotFound(target.to_string()))?;
15✔
95

96
    let (migrations_range, num_migrations) = match latest_applied_migration(db).await {
46✔
97
        Ok(latest_migration) => {
6✔
98
            let latest_index =
6✔
99
                migration_index::<M>(&latest_migration).ok_or(Error::DbMigrationNotFound)?;
6✔
100
            match target_index.cmp(&latest_index) {
6✔
101
                Ordering::Less => return Err(Error::VersionTooOld(target.to_string())),
1✔
102
                Ordering::Equal => {
103
                    info!("no action taken, already at desired version");
2✔
104
                    return Ok(());
2✔
105
                }
106
                Ordering::Greater => (
3✔
107
                    (latest_index + 1)..=target_index,
3✔
108
                    target_index - latest_index,
3✔
109
                ),
3✔
110
            }
111
        }
112
        Err(Error::DbNotInitialized) => (
8✔
113
            0usize..=target_index,
8✔
114
            // The migration API takes "number of migrations to apply". If we have an
8✔
115
            // uninitialized database, and we want to apply the first migration (index 0),
8✔
116
            // then we still have to apply at least one migration.
8✔
117
            target_index + 1,
8✔
118
        ),
8✔
119
        Err(err) => return Err(err),
×
120
    };
121

122
    info!(
11✔
123
        "executing {num_migrations} up migration(s) to reach {target}: {:?}",
×
124
        Migrator::migrations()[migrations_range]
×
125
            .iter()
×
126
            .map(|m| m.name())
×
127
            .collect::<Vec<_>>()
×
128
    );
129
    if !dry_run {
11✔
130
        M::up(db, Some(u32::try_from(num_migrations)?))
11✔
131
            .await
392✔
132
            .map_err(Error::from)?
11✔
133
    }
×
134
    Ok(())
11✔
135
}
15✔
136

137
async fn migrate_down<M: MigratorTrait>(
6✔
138
    db: &DatabaseConnection,
6✔
139
    dry_run: bool,
6✔
140
    target: &str,
6✔
141
) -> Result<(), Error> {
6✔
142
    let latest_index = migration_index::<M>(&latest_applied_migration(db).await?)
6✔
143
        .ok_or(Error::DbMigrationNotFound)?;
5✔
144
    let target_index =
4✔
145
        migration_index::<M>(target).ok_or(Error::MigrationNotFound(target.to_string()))?;
5✔
146

147
    let num_migrations = match latest_index.cmp(&target_index) {
4✔
148
        Ordering::Less => return Err(Error::VersionTooNew(target.to_string())),
1✔
149
        Ordering::Equal => {
150
            info!("no action taken, already at desired version");
1✔
151
            return Ok(());
1✔
152
        }
153
        Ordering::Greater => latest_index - target_index,
2✔
154
    };
2✔
155

2✔
156
    info!(
2✔
157
        "executing {num_migrations} down migration(s) to reach {target}: {:?}",
×
158
        Migrator::migrations()[(target_index + 1)..=(latest_index)]
×
159
            .iter()
×
160
            .rev()
×
161
            .map(|m| m.name())
×
162
            .collect::<Vec<_>>()
×
163
    );
164
    if !dry_run {
2✔
165
        M::down(db, Some(u32::try_from(num_migrations)?))
1✔
166
            .await
×
167
            .map_err(Error::from)?
1✔
168
    }
1✔
169
    Ok(())
2✔
170
}
6✔
171

172
fn migration_index<M: MigratorTrait>(version: &str) -> Option<usize> {
31✔
173
    M::migrations().iter().position(|m| m.name() == version)
112✔
174
}
31✔
175

176
async fn has_migrations_table(db: &DatabaseConnection) -> Result<bool, Error> {
27✔
177
    Ok(SchemaManager::new(db).has_table("seaql_migrations").await?)
52✔
178
}
27✔
179

180
async fn latest_applied_migration(db: &DatabaseConnection) -> Result<String, Error> {
20✔
181
    if !has_migrations_table(db).await? {
35✔
182
        return Err(Error::DbNotInitialized);
9✔
183
    }
11✔
184
    Ok(seaql_migrations::Entity::find()
11✔
185
        .order_by_desc(seaql_migrations::Column::Version)
11✔
186
        .one(db)
11✔
187
        .await?
17✔
188
        // The migrations table exists, but no migrations have been applied.
189
        .ok_or(Error::DbNotInitialized)?
11✔
190
        .version)
191
}
20✔
192

193
async fn applied_migrations(db: &DatabaseConnection) -> Result<Vec<String>, Error> {
26✔
194
    Ok(seaql_migrations::Entity::find()
26✔
195
        .order_by_asc(seaql_migrations::Column::Version)
26✔
196
        .all(db)
26✔
197
        .await?
90✔
198
        .into_iter()
26✔
199
        .map(|m| m.version)
99✔
200
        .collect())
26✔
201
}
26✔
202

203
#[derive(Debug, thiserror::Error)]
204
enum Error {
205
    #[error("DB error: {0}")]
206
    Db(#[from] DbErr),
207
    #[error("DB is not initialized with migrations table")]
208
    DbNotInitialized,
209
    #[error("migration applied to DB is not found in available migrations")]
210
    DbMigrationNotFound,
211
    #[error("migration version {0} not found in avaliable migrations")]
212
    MigrationNotFound(String),
213
    #[error("migration version {0} is older than the latest applied migration")]
214
    VersionTooOld(String),
215
    #[error("migration version {0} is newer than the latest applied migration")]
216
    VersionTooNew(String),
217
    #[error("error calculating number of migrations, too many migrations?: {0}")]
218
    Overflow(#[from] std::num::TryFromIntError),
219
    #[error("applied migrations do not match migrations present in this tool")]
220
    DbNotCompatible,
221
}
222

223
fn available_migrations() -> PossibleValuesParser {
×
224
    PossibleValuesParser::new(
×
225
        // Leak memory to give migration names 'static lifetime, so clap can
×
226
        // use them.
×
227
        Migrator::migrations()
×
228
            .into_iter()
×
229
            .map(|m| Box::leak(Box::new(m.name().to_owned())) as &'static str)
×
230
            .collect::<Vec<_>>(),
×
231
    )
×
232
}
×
233

234
#[cfg(test)]
235
mod tests {
236
    use std::{sync::Once, time::SystemTime};
237

238
    use super::*;
239
    use sea_orm::{ActiveModelTrait, ActiveValue};
240
    use sea_orm_migration::prelude::*;
241

242
    macro_rules! test_migration {
243
        ($name:ident, $table_name:ident) => {
244
            #[allow(non_camel_case_types)]
245
            struct $name;
246

247
            impl MigrationName for $name {
248
                fn name(&self) -> &str {
291✔
249
                    stringify!($name)
291✔
250
                }
291✔
251
            }
252

253
            #[async_trait::async_trait]
254
            impl MigrationTrait for $name {
255
                async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
36✔
256
                    manager
36✔
257
                        .create_table(
36✔
258
                            Table::create()
36✔
259
                                .table($table_name::Table)
36✔
260
                                .col(ColumnDef::new($table_name::Id).uuid().primary_key())
36✔
261
                                .to_owned(),
36✔
262
                        )
36✔
263
                        .await?;
108✔
264
                    Ok(())
36✔
265
                }
72✔
266
                async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
2✔
267
                    manager
2✔
268
                        .drop_table(Table::drop().table($table_name::Table).to_owned())
2✔
269
                        .await
×
270
                }
4✔
271
            }
272

273
            #[derive(Iden)]
74✔
274
            enum $table_name {
275
                Table,
276
                Id,
277
            }
278
        };
279
    }
280
    test_migration!(m20230101_000000_test_migration_1, TestTable1);
281
    test_migration!(m20230201_000000_test_migration_2, TestTable2);
282
    test_migration!(m20230301_000000_test_migration_3, TestTable3);
283
    test_migration!(m20230401_000000_test_migration_4, TestTable4);
284
    test_migration!(m20230501_000000_test_migration_5, TestTable5);
285

286
    struct TestMigrator;
287

288
    #[async_trait::async_trait]
289
    impl MigratorTrait for TestMigrator {
290
        fn migrations() -> Vec<Box<dyn MigrationTrait>> {
46✔
291
            vec![
46✔
292
                Box::new(m20230101_000000_test_migration_1),
46✔
293
                Box::new(m20230201_000000_test_migration_2),
46✔
294
                Box::new(m20230301_000000_test_migration_3),
46✔
295
                Box::new(m20230401_000000_test_migration_4),
46✔
296
                Box::new(m20230501_000000_test_migration_5),
46✔
297
            ]
46✔
298
        }
46✔
299
    }
300

301
    fn all_migrations() -> Vec<&'static str> {
17✔
302
        vec![
17✔
303
            "m20230101_000000_test_migration_1",
17✔
304
            "m20230201_000000_test_migration_2",
17✔
305
            "m20230301_000000_test_migration_3",
17✔
306
            "m20230401_000000_test_migration_4",
17✔
307
            "m20230501_000000_test_migration_5",
17✔
308
        ]
17✔
309
    }
17✔
310

311
    async fn test_database() -> DatabaseConnection {
8✔
312
        Database::connect(ConnectOptions::new("sqlite::memory:".to_string()))
8✔
313
            .await
22✔
314
            .unwrap()
8✔
315
    }
8✔
316

317
    #[async_std::test]
2✔
318
    async fn migrate_up_latest() {
319
        install_tracing_subscriber();
320
        let db = test_database().await;
321

322
        // To latest
323
        migrate_up::<TestMigrator>(&db, false, "m20230501_000000_test_migration_5")
324
            .await
325
            .unwrap();
326
        assert_eq!(applied_migrations(&db).await.unwrap(), all_migrations());
327

328
        // Ensure no-op
329
        migrate_up::<TestMigrator>(&db, false, "m20230501_000000_test_migration_5")
330
            .await
331
            .unwrap();
332
        assert_eq!(applied_migrations(&db).await.unwrap(), all_migrations());
333
    }
334

335
    #[async_std::test]
2✔
336
    async fn migrate_up_works() {
337
        install_tracing_subscriber();
338
        let db = test_database().await;
339

340
        // To first
341
        migrate_up::<TestMigrator>(&db, false, "m20230101_000000_test_migration_1")
342
            .await
343
            .unwrap();
344
        assert_eq!(
345
            applied_migrations(&db).await.unwrap(),
346
            vec!["m20230101_000000_test_migration_1"]
347
        );
348

349
        // Dry run
350
        migrate_up::<TestMigrator>(&db, true, "m20230101_000000_test_migration_1")
351
            .await
352
            .unwrap();
353
        assert_eq!(
354
            applied_migrations(&db).await.unwrap(),
355
            vec!["m20230101_000000_test_migration_1"]
356
        );
357

358
        // To third
359
        migrate_up::<TestMigrator>(&db, false, "m20230301_000000_test_migration_3")
360
            .await
361
            .unwrap();
362
        assert_eq!(
363
            applied_migrations(&db).await.unwrap(),
364
            all_migrations()[..3]
365
        );
366

367
        // To non-existent
368
        let result = migrate_up::<TestMigrator>(&db, false, "foobar").await;
369
        assert!(matches!(result, Err(Error::MigrationNotFound(_))));
370
        assert_eq!(
371
            applied_migrations(&db).await.unwrap(),
372
            all_migrations()[..3]
373
        );
374

375
        // To old version
376
        let result =
377
            migrate_up::<TestMigrator>(&db, false, "m20230101_000000_test_migration_1").await;
378
        assert!(matches!(result, Err(Error::VersionTooOld(_))));
379
        assert_eq!(
380
            applied_migrations(&db).await.unwrap(),
381
            all_migrations()[..3]
382
        );
383
    }
384

385
    #[async_std::test]
2✔
386
    async fn migrate_down_works() {
387
        install_tracing_subscriber();
388
        let db = test_database().await;
389
        let result =
390
            migrate_down::<TestMigrator>(&db, false, "m20230401_000000_test_migration_4").await;
391
        assert!(matches!(result, Err(Error::DbNotInitialized)));
392

393
        // Fail if DB not initialized
394

395
        // To latest
396
        migrate_up::<TestMigrator>(&db, false, "m20230501_000000_test_migration_5")
397
            .await
398
            .unwrap();
399
        assert_eq!(applied_migrations(&db).await.unwrap(), all_migrations());
400

401
        // Ensure no-op
402
        migrate_down::<TestMigrator>(&db, false, "m20230501_000000_test_migration_5")
403
            .await
404
            .unwrap();
405
        assert_eq!(applied_migrations(&db).await.unwrap(), all_migrations());
406

407
        // Dry-run
408
        migrate_down::<TestMigrator>(&db, true, "m20230301_000000_test_migration_3")
409
            .await
410
            .unwrap();
411
        assert_eq!(applied_migrations(&db).await.unwrap(), all_migrations());
412

413
        // To third
414
        migrate_down::<TestMigrator>(&db, false, "m20230301_000000_test_migration_3")
415
            .await
416
            .unwrap();
417
        assert_eq!(
418
            applied_migrations(&db).await.unwrap(),
419
            all_migrations()[..3]
420
        );
421

422
        // To newer version
423
        let result =
424
            migrate_down::<TestMigrator>(&db, false, "m20230401_000000_test_migration_4").await;
425
        assert!(matches!(result, Err(Error::VersionTooNew(_))));
426
        assert_eq!(
427
            applied_migrations(&db).await.unwrap(),
428
            all_migrations()[..3]
429
        );
430

431
        // To non-existent version
432
        let result = migrate_down::<TestMigrator>(&db, false, "foobar").await;
433
        assert!(matches!(result, Err(Error::MigrationNotFound(_))));
434
        assert_eq!(
435
            applied_migrations(&db).await.unwrap(),
436
            all_migrations()[..3]
437
        );
438

439
        // Upgrade back to fourth, ensure we can still upgrade again.
440
        migrate_up::<TestMigrator>(&db, false, "m20230401_000000_test_migration_4")
441
            .await
442
            .unwrap();
443
        assert_eq!(
444
            applied_migrations(&db).await.unwrap(),
445
            all_migrations()[..4]
446
        );
447
    }
448

449
    #[async_std::test]
2✔
450
    async fn accept_compatible_db() {
451
        install_tracing_subscriber();
452
        let db = test_database().await;
453
        check_database_is_compatible::<TestMigrator>(&db)
454
            .await
455
            .unwrap();
456

457
        // To third
458
        migrate_up::<TestMigrator>(&db, false, "m20230301_000000_test_migration_3")
459
            .await
460
            .unwrap();
461
        assert_eq!(
462
            applied_migrations(&db).await.unwrap(),
463
            all_migrations()[..3]
464
        );
465
        check_database_is_compatible::<TestMigrator>(&db)
466
            .await
467
            .unwrap();
468

469
        // To latest
470
        migrate_up::<TestMigrator>(&db, false, "m20230501_000000_test_migration_5")
471
            .await
472
            .unwrap();
473
        assert_eq!(applied_migrations(&db).await.unwrap(), all_migrations());
474
        check_database_is_compatible::<TestMigrator>(&db)
475
            .await
476
            .unwrap();
477
    }
478

479
    #[async_std::test]
2✔
480
    async fn reject_incompatible_db_using_wrong_migrator() {
481
        install_tracing_subscriber();
482

483
        struct AnotherTestMigrator;
484
        test_migration!(m20240101_000000_test_migration_1, AnotherTestTable1);
485
        test_migration!(m20240201_000000_test_migration_2, AnotherTestTable2);
486
        #[async_trait::async_trait]
487
        impl MigratorTrait for AnotherTestMigrator {
488
            fn migrations() -> Vec<Box<dyn MigrationTrait>> {
3✔
489
                vec![
3✔
490
                    Box::new(m20240101_000000_test_migration_1),
3✔
491
                    Box::new(m20240201_000000_test_migration_2),
3✔
492
                ]
3✔
493
            }
3✔
494
        }
495

496
        // DB brought up with AnotherTestMigrator. Simulates database brought
497
        // up on an entirely different schema.
498
        {
499
            let db = test_database().await;
500
            migrate_up::<AnotherTestMigrator>(&db, false, "m20240201_000000_test_migration_2")
501
                .await
502
                .unwrap();
503
            assert_eq!(
504
                applied_migrations(&db).await.unwrap(),
505
                vec![
506
                    "m20240101_000000_test_migration_1",
507
                    "m20240201_000000_test_migration_2",
508
                ]
509
            );
510

511
            // Use wrong TestMigrator, result should be incompatible.
512
            assert!(matches!(
513
                check_database_is_compatible::<TestMigrator>(&db).await,
514
                Err(Error::DbNotCompatible),
515
            ));
516
        }
517
        {
518
            let db = test_database().await;
519
            migrate_up::<TestMigrator>(&db, false, "m20230501_000000_test_migration_5")
520
                .await
521
                .unwrap();
522
            assert_eq!(applied_migrations(&db).await.unwrap(), all_migrations());
523

524
            // Use wrong TestMigrator, result should be incompatible.
525
            assert!(matches!(
526
                check_database_is_compatible::<AnotherTestMigrator>(&db).await,
527
                Err(Error::DbNotCompatible),
528
            ));
529
        }
530
    }
531

532
    #[async_std::test]
2✔
533
    async fn reject_incompatible_db_using_outdated_migrator() {
534
        install_tracing_subscriber();
535

536
        // To latest
537
        let db = test_database().await;
538
        migrate_up::<TestMigrator>(&db, false, "m20230501_000000_test_migration_5")
539
            .await
540
            .unwrap();
541
        assert_eq!(applied_migrations(&db).await.unwrap(), all_migrations());
542

543
        // Insert an additional fake migration, to simulate the database having
544
        // a newer schema than this tool supports.
545
        seaql_migrations::ActiveModel {
546
            version: ActiveValue::Set("m20230601_000000_test_migration_6".to_owned()),
547
            applied_at: ActiveValue::Set(
548
                SystemTime::now()
549
                    .duration_since(SystemTime::UNIX_EPOCH)
550
                    .unwrap()
551
                    .as_secs() as i64,
552
            ),
553
        }
554
        .insert(&db)
555
        .await
556
        .unwrap();
557

558
        assert!(matches!(
559
            check_database_is_compatible::<TestMigrator>(&db).await,
560
            Err(Error::DbNotCompatible),
561
        ));
562
    }
563

564
    #[async_std::test]
2✔
565
    async fn reject_incompatible_db_tampered() {
566
        install_tracing_subscriber();
567

568
        let db = test_database().await;
569

570
        // To latest
571
        migrate_up::<TestMigrator>(&db, false, "m20230501_000000_test_migration_5")
572
            .await
573
            .unwrap();
574
        assert_eq!(applied_migrations(&db).await.unwrap(), all_migrations());
575

576
        // Tamper with schema table.
577
        seaql_migrations::Entity::delete_by_id("m20230301_000000_test_migration_3")
578
            .exec(&db)
579
            .await
580
            .unwrap();
581
        assert!(matches!(
582
            check_database_is_compatible::<TestMigrator>(&db).await,
583
            Err(Error::DbNotCompatible),
584
        ));
585
    }
586

587
    #[test]
588
    fn ensure_migrations_are_sorted() {
1✔
589
        // Migrations in Migrator must be in lexicographic order, otherwise
1✔
590
        // this CLI will not work correctly.
1✔
591
        assert!(Migrator::migrations()
1✔
592
            .windows(2)
1✔
593
            .all(|window| window[0].name() <= window[1].name()))
24✔
594
    }
1✔
595

596
    fn install_tracing_subscriber() {
7✔
597
        static INSTALL_TRACE_SUBSCRIBER: Once = Once::new();
598
        INSTALL_TRACE_SUBSCRIBER.call_once(|| {
7✔
599
            tracing_subscriber::fmt()
7✔
600
                .with_env_filter(EnvFilter::from_default_env())
7✔
601
                .with_test_writer()
7✔
602
                .init();
7✔
603
        });
7✔
604
    }
7✔
605
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc