• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16139624707

08 Jul 2025 09:37AM UTC coverage: 67.518% (+9.7%) from 57.787%
16139624707

Pull #10036

github

web-flow
Merge cb959bddb into b815109b8
Pull Request #10036: [graph mig 1]: graph/db: migrate graph nodes from kvdb to SQL

0 of 204 new or added lines in 3 files covered. (0.0%)

27 existing lines in 7 files now uncovered.

135164 of 200190 relevant lines covered (67.52%)

21804.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.78
/sqldb/migrations.go
1
package sqldb
2

3
import (
4
        "bytes"
5
        "context"
6
        "database/sql"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "io/fs"
11
        "net/http"
12
        "reflect"
13
        "strings"
14
        "time"
15

16
        "github.com/btcsuite/btclog/v2"
17
        "github.com/davecgh/go-spew/spew"
18
        "github.com/golang-migrate/migrate/v4"
19
        "github.com/golang-migrate/migrate/v4/database"
20
        "github.com/golang-migrate/migrate/v4/source/httpfs"
21
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
22
        "github.com/pmezard/go-difflib/difflib"
23
)
24

25
var (
26
        // migrationConfig defines a list of migrations to be applied to the
27
        // database. Each migration is assigned a version number, determining
28
        // its execution order.
29
        // The schema version, tracked by golang-migrate, ensures migrations are
30
        // applied to the correct schema. For migrations involving only schema
31
        // changes, the migration function can be left nil. For custom
32
        // migrations an implemented migration function is required.
33
        //
34
        // NOTE: The migration function may have runtime dependencies, which
35
        // must be injected during runtime.
36
        migrationConfig = append([]MigrationConfig{
37
                {
38
                        Name:          "000001_invoices",
39
                        Version:       1,
40
                        SchemaVersion: 1,
41
                },
42
                {
43
                        Name:          "000002_amp_invoices",
44
                        Version:       2,
45
                        SchemaVersion: 2,
46
                },
47
                {
48
                        Name:          "000003_invoice_events",
49
                        Version:       3,
50
                        SchemaVersion: 3,
51
                },
52
                {
53
                        Name:          "000004_invoice_expiry_fix",
54
                        Version:       4,
55
                        SchemaVersion: 4,
56
                },
57
                {
58
                        Name:          "000005_migration_tracker",
59
                        Version:       5,
60
                        SchemaVersion: 5,
61
                },
62
                {
63
                        Name:          "000006_invoice_migration",
64
                        Version:       6,
65
                        SchemaVersion: 6,
66
                },
67
                {
68
                        Name:          "kv_invoice_migration",
69
                        Version:       7,
70
                        SchemaVersion: 6,
71
                        // A migration function is may be attached to this
72
                        // migration to migrate KV invoices to the native SQL
73
                        // schema. This is optional and can be disabled by the
74
                        // user if necessary.
75
                },
76
        }, migrationAdditions...)
77

78
        // ErrMigrationMismatch is returned when a migrated record does not
79
        // match the original record.
80
        ErrMigrationMismatch = fmt.Errorf("migrated record does not match " +
81
                "original record")
82
)
83

84
// MigrationConfig is a configuration struct that describes SQL migrations. Each
85
// migration is associated with a specific schema version and a global database
86
// version. Migrations are applied in the order of their global database
87
// version. If a migration includes a non-nil MigrationFn, it is executed after
88
// the SQL schema has been migrated to the corresponding schema version.
89
type MigrationConfig struct {
90
        // Name is the name of the migration.
91
        Name string
92

93
        // Version represents the "global" database version for this migration.
94
        // Unlike the schema version tracked by golang-migrate, it encompasses
95
        // all migrations, including those managed by golang-migrate as well
96
        // as custom in-code migrations.
97
        Version int
98

99
        // SchemaVersion represents the schema version tracked by golang-migrate
100
        // at which the migration is applied.
101
        SchemaVersion int
102

103
        // MigrationFn is the function executed for custom migrations at the
104
        // specified version. It is used to handle migrations that cannot be
105
        // performed through SQL alone. If set to nil, no custom migration is
106
        // applied.
107
        MigrationFn func(tx *sqlc.Queries) error
108
}
109

110
// MigrationTarget is a functional option that can be passed to applyMigrations
111
// to specify a target version to migrate to.
112
type MigrationTarget func(mig *migrate.Migrate) error
113

114
// MigrationExecutor is an interface that abstracts the migration functionality.
115
type MigrationExecutor interface {
116
        // ExecuteMigrations runs database migrations up to the specified target
117
        // version or all migrations if no target is specified. A migration may
118
        // include a schema change, a custom migration function, or both.
119
        // Developers must ensure that migrations are defined in the correct
120
        // order. Migration details are stored in the global variable
121
        // migrationConfig.
122
        ExecuteMigrations(target MigrationTarget) error
123

124
        // GetSchemaVersion returns the current schema version of the database.
125
        GetSchemaVersion() (int, bool, error)
126

127
        // SetSchemaVersion sets the schema version of the database.
128
        //
129
        // NOTE: This alters the internal database schema tracker. USE WITH
130
        // CAUTION!!!
131
        SetSchemaVersion(version int, dirty bool) error
132
}
133

134
var (
135
        // TargetLatest is a MigrationTarget that migrates to the latest
136
        // version available.
137
        TargetLatest = func(mig *migrate.Migrate) error {
×
138
                return mig.Up()
×
139
        }
×
140

141
        // TargetVersion is a MigrationTarget that migrates to the given
142
        // version.
143
        TargetVersion = func(version uint) MigrationTarget {
3,675✔
144
                return func(mig *migrate.Migrate) error {
7,350✔
145
                        return mig.Migrate(version)
3,675✔
146
                }
3,675✔
147
        }
148
)
149

150
// GetMigrations returns a copy of the migration configuration.
151
func GetMigrations() []MigrationConfig {
520✔
152
        migrations := make([]MigrationConfig, len(migrationConfig))
520✔
153
        copy(migrations, migrationConfig)
520✔
154

520✔
155
        return migrations
520✔
156
}
520✔
157

158
// migrationLogger is a logger that wraps the passed btclog.Logger so it can be
159
// used to log migrations.
160
type migrationLogger struct {
161
        log btclog.Logger
162
}
163

164
// Printf is like fmt.Printf. We map this to the target logger based on the
165
// current log level.
166
func (m *migrationLogger) Printf(format string, v ...interface{}) {
3,150✔
167
        // Trim trailing newlines from the format.
3,150✔
168
        format = strings.TrimRight(format, "\n")
3,150✔
169

3,150✔
170
        switch m.log.Level() {
3,150✔
171
        case btclog.LevelTrace:
×
172
                m.log.Tracef(format, v...)
×
173
        case btclog.LevelDebug:
×
174
                m.log.Debugf(format, v...)
×
175
        case btclog.LevelInfo:
×
176
                m.log.Infof(format, v...)
×
177
        case btclog.LevelWarn:
×
178
                m.log.Warnf(format, v...)
×
179
        case btclog.LevelError:
×
180
                m.log.Errorf(format, v...)
×
181
        case btclog.LevelCritical:
×
182
                m.log.Criticalf(format, v...)
×
183
        case btclog.LevelOff:
3,150✔
184
        }
185
}
186

187
// Verbose should return true when verbose logging output is wanted
188
func (m *migrationLogger) Verbose() bool {
9,448✔
189
        return m.log.Level() <= btclog.LevelDebug
9,448✔
190
}
9,448✔
191

192
// applyMigrations executes all database migration files found in the given file
193
// system under the given path, using the passed database driver and database
194
// name.
195
func applyMigrations(fs fs.FS, driver database.Driver, path,
196
        dbName string, targetVersion MigrationTarget) error {
3,675✔
197

3,675✔
198
        // With the migrate instance open, we'll create a new migration source
3,675✔
199
        // using the embedded file system stored in sqlSchemas. The library
3,675✔
200
        // we're using can't handle a raw file system interface, so we wrap it
3,675✔
201
        // in this intermediate layer.
3,675✔
202
        migrateFileServer, err := httpfs.New(http.FS(fs), path)
3,675✔
203
        if err != nil {
3,675✔
204
                return err
×
205
        }
×
206

207
        // Finally, we'll run the migration with our driver above based on the
208
        // open DB, and also the migration source stored in the file system
209
        // above.
210
        sqlMigrate, err := migrate.NewWithInstance(
3,675✔
211
                "migrations", migrateFileServer, dbName, driver,
3,675✔
212
        )
3,675✔
213
        if err != nil {
3,675✔
214
                return err
×
215
        }
×
216

217
        migrationVersion, _, err := sqlMigrate.Version()
3,675✔
218
        if err != nil && !errors.Is(err, migrate.ErrNilVersion) {
3,675✔
219
                log.Errorf("Unable to determine current migration version: %v",
×
220
                        err)
×
221

×
222
                return err
×
223
        }
×
224

225
        log.Infof("Applying migrations from version=%v", migrationVersion)
3,675✔
226

3,675✔
227
        // Apply our local logger to the migration instance.
3,675✔
228
        sqlMigrate.Log = &migrationLogger{log}
3,675✔
229

3,675✔
230
        // Execute the migration based on the target given.
3,675✔
231
        err = targetVersion(sqlMigrate)
3,675✔
232
        if err != nil && !errors.Is(err, migrate.ErrNoChange) {
3,675✔
233
                return err
×
234
        }
×
235

236
        return nil
3,675✔
237
}
238

239
// replacerFS is an implementation of a fs.FS virtual file system that wraps an
240
// existing file system but does a search-and-replace operation on each file
241
// when it is opened.
242
type replacerFS struct {
243
        parentFS fs.FS
244
        replaces map[string]string
245
}
246

247
// A compile-time assertion to make sure replacerFS implements the fs.FS
248
// interface.
249
var _ fs.FS = (*replacerFS)(nil)
250

251
// newReplacerFS creates a new replacer file system, wrapping the given parent
252
// virtual file system. Each file within the file system is undergoing a
253
// search-and-replace operation when it is opened, using the given map where the
254
// key denotes the search term and the value the term to replace each occurrence
255
// with.
256
func newReplacerFS(parent fs.FS, replaces map[string]string) *replacerFS {
3,675✔
257
        return &replacerFS{
3,675✔
258
                parentFS: parent,
3,675✔
259
                replaces: replaces,
3,675✔
260
        }
3,675✔
261
}
3,675✔
262

263
// Open opens a file in the virtual file system.
264
//
265
// NOTE: This is part of the fs.FS interface.
266
func (t *replacerFS) Open(name string) (fs.File, error) {
13,644✔
267
        f, err := t.parentFS.Open(name)
13,644✔
268
        if err != nil {
13,644✔
269
                return nil, err
×
270
        }
×
271

272
        stat, err := f.Stat()
13,644✔
273
        if err != nil {
13,644✔
274
                return nil, err
×
275
        }
×
276

277
        if stat.IsDir() {
17,319✔
278
                return f, err
3,675✔
279
        }
3,675✔
280

281
        return newReplacerFile(f, t.replaces)
9,969✔
282
}
283

284
type replacerFile struct {
285
        parentFile fs.File
286
        buf        bytes.Buffer
287
}
288

289
// A compile-time assertion to make sure replacerFile implements the fs.File
290
// interface.
291
var _ fs.File = (*replacerFile)(nil)
292

293
func newReplacerFile(parent fs.File, replaces map[string]string) (*replacerFile,
294
        error) {
9,969✔
295

9,969✔
296
        content, err := io.ReadAll(parent)
9,969✔
297
        if err != nil {
9,969✔
298
                return nil, err
×
299
        }
×
300

301
        contentStr := string(content)
9,969✔
302
        for from, to := range replaces {
24,894✔
303
                contentStr = strings.ReplaceAll(contentStr, from, to)
14,925✔
304
        }
14,925✔
305

306
        var buf bytes.Buffer
9,969✔
307
        _, err = buf.WriteString(contentStr)
9,969✔
308
        if err != nil {
9,969✔
309
                return nil, err
×
310
        }
×
311

312
        return &replacerFile{
9,969✔
313
                parentFile: parent,
9,969✔
314
                buf:        buf,
9,969✔
315
        }, nil
9,969✔
316
}
317

318
// Stat returns statistics/info about the file.
319
//
320
// NOTE: This is part of the fs.File interface.
321
func (t *replacerFile) Stat() (fs.FileInfo, error) {
×
322
        return t.parentFile.Stat()
×
323
}
×
324

325
// Read reads as many bytes as possible from the file into the given slice.
326
//
327
// NOTE: This is part of the fs.File interface.
328
func (t *replacerFile) Read(bytes []byte) (int, error) {
9,444✔
329
        return t.buf.Read(bytes)
9,444✔
330
}
9,444✔
331

332
// Close closes the underlying file.
333
//
334
// NOTE: This is part of the fs.File interface.
335
func (t *replacerFile) Close() error {
9,969✔
336
        // We already fully read and then closed the file when creating this
9,969✔
337
        // instance, so there's nothing to do for us here.
9,969✔
338
        return nil
9,969✔
339
}
9,969✔
340

341
// ApplyMigrations applies the provided migrations to the database in sequence.
342
// It ensures migrations are executed in the correct order, applying both custom
343
// migration functions and SQL migrations as needed.
344
func ApplyMigrations(ctx context.Context, db *BaseDB,
345
        migrator MigrationExecutor, migrations []MigrationConfig) error {
547✔
346

547✔
347
        // Ensure that the migrations are sorted by version.
547✔
348
        for i := 0; i < len(migrations); i++ {
4,268✔
349
                if migrations[i].Version != i+1 {
3,727✔
350
                        return fmt.Errorf("migration version %d is out of "+
6✔
351
                                "order. Expected %d", migrations[i].Version,
6✔
352
                                i+1)
6✔
353
                }
6✔
354
        }
355
        // Construct a transaction executor to apply custom migrations.
356
        executor := NewTransactionExecutor(db, func(tx *sql.Tx) *sqlc.Queries {
4,212✔
357
                return db.WithTx(tx)
3,671✔
358
        })
3,671✔
359

360
        currentVersion := 0
541✔
361
        version, err := db.GetDatabaseVersion(ctx)
541✔
362
        if !errors.Is(err, sql.ErrNoRows) {
555✔
363
                if err != nil {
14✔
364
                        return fmt.Errorf("error getting current database "+
×
365
                                "version: %w", err)
×
366
                }
×
367

368
                currentVersion = int(version)
14✔
369
        } else {
527✔
370
                // Since we don't have a version tracked by our own table yet,
527✔
371
                // we'll use the schema version reported by sqlc to determine
527✔
372
                // the current version.
527✔
373
                //
527✔
374
                // NOTE: This is safe because the first in-code migration was
527✔
375
                // introduced in version 7. This is only possible if the user
527✔
376
                // has a schema version <= 4.
527✔
377
                var dirty bool
527✔
378
                currentVersion, dirty, err = migrator.GetSchemaVersion()
527✔
379
                if err != nil {
527✔
380
                        return err
×
381
                }
×
382

383
                log.Infof("No database version found, using schema version %d "+
527✔
384
                        "(dirty=%v) as base version", currentVersion, dirty)
527✔
385
        }
386

387
        // Due to an a migration issue in v0.19.0-rc1 we may be at version 2 and
388
        // have a dirty schema due to failing migration 3. If this is indeed the
389
        // case, we need to reset the dirty flag to be able to apply the fixed
390
        // migration.
391
        // NOTE: this could be removed as soon as we drop v0.19.0-beta.
392
        if version == 2 {
543✔
393
                schemaVersion, dirty, err := migrator.GetSchemaVersion()
2✔
394
                if err != nil {
2✔
395
                        return err
×
396
                }
×
397

398
                if schemaVersion == 3 && dirty {
4✔
399
                        log.Warnf("Schema version %d is dirty. This is "+
2✔
400
                                "likely a consequence of a failed migration "+
2✔
401
                                "in v0.19.0-rc1. Attempting to recover by "+
2✔
402
                                "resetting the dirty flag", schemaVersion)
2✔
403

2✔
404
                        err = migrator.SetSchemaVersion(4, false)
2✔
405
                        if err != nil {
2✔
406
                                return err
×
407
                        }
×
408
                }
409
        }
410

411
        for _, migration := range migrations {
4,256✔
412
                if migration.Version <= currentVersion {
3,759✔
413
                        log.Infof("Skipping migration '%s' (version %d) as it "+
44✔
414
                                "has already been applied", migration.Name,
44✔
415
                                migration.Version)
44✔
416

44✔
417
                        continue
44✔
418
                }
419

420
                log.Infof("Migrating SQL schema to version %d",
3,671✔
421
                        migration.SchemaVersion)
3,671✔
422

3,671✔
423
                // Execute SQL schema migrations up to the target version.
3,671✔
424
                err = migrator.ExecuteMigrations(
3,671✔
425
                        TargetVersion(uint(migration.SchemaVersion)),
3,671✔
426
                )
3,671✔
427
                if err != nil {
3,671✔
428
                        return fmt.Errorf("error executing schema migrations "+
×
429
                                "to target version %d: %w",
×
430
                                migration.SchemaVersion, err)
×
431
                }
×
432

433
                opts := WriteTxOpt()
3,671✔
434

3,671✔
435
                // Run the custom migration as a transaction to ensure
3,671✔
436
                // atomicity. If successful, mark the migration as complete in
3,671✔
437
                // the migration tracker table.
3,671✔
438
                err = executor.ExecTx(ctx, opts, func(tx *sqlc.Queries) error {
7,342✔
439
                        // Apply the migration function if one is provided.
3,671✔
440
                        if migration.MigrationFn != nil {
3,697✔
441
                                log.Infof("Applying custom migration '%v' "+
26✔
442
                                        "(version %d) to schema version %d",
26✔
443
                                        migration.Name, migration.Version,
26✔
444
                                        migration.SchemaVersion)
26✔
445

26✔
446
                                err = migration.MigrationFn(tx)
26✔
447
                                if err != nil {
32✔
448
                                        return fmt.Errorf("error applying "+
6✔
449
                                                "migration '%v' (version %d) "+
6✔
450
                                                "to schema version %d: %w",
6✔
451
                                                migration.Name,
6✔
452
                                                migration.Version,
6✔
453
                                                migration.SchemaVersion, err)
6✔
454
                                }
6✔
455

456
                                log.Infof("Migration '%v' (version %d) "+
20✔
457
                                        "applied ", migration.Name,
20✔
458
                                        migration.Version)
20✔
459
                        }
460

461
                        // Mark the migration as complete by adding the version
462
                        // to the migration tracker table along with the current
463
                        // timestamp.
464
                        err = tx.SetMigration(ctx, sqlc.SetMigrationParams{
3,665✔
465
                                Version:       int32(migration.Version),
3,665✔
466
                                MigrationTime: time.Now(),
3,665✔
467
                        })
3,665✔
468
                        if err != nil {
3,665✔
469
                                return fmt.Errorf("error setting migration "+
×
470
                                        "version %d: %w", migration.Version,
×
471
                                        err)
×
472
                        }
×
473

474
                        return nil
3,665✔
475
                }, func() {})
3,671✔
476
                if err != nil {
3,677✔
477
                        return err
6✔
478
                }
6✔
479
        }
480

481
        return nil
535✔
482
}
483

484
// CompareRecords checks if the original and migrated objects are equal. If
485
// they are not, it returns an error with a unified diff of the two objects.
NEW
486
func CompareRecords(original, migrated any, identifier string) error {
×
NEW
487
        if reflect.DeepEqual(original, migrated) {
×
NEW
488
                return nil
×
NEW
489
        }
×
490

NEW
491
        diff := difflib.UnifiedDiff{
×
NEW
492
                A:        difflib.SplitLines(spew.Sdump(original)),
×
NEW
493
                B:        difflib.SplitLines(spew.Sdump(migrated)),
×
NEW
494
                FromFile: "Expected",
×
NEW
495
                FromDate: "",
×
NEW
496
                ToFile:   "Actual",
×
NEW
497
                ToDate:   "",
×
NEW
498
                Context:  3,
×
NEW
499
        }
×
NEW
500
        diffText, _ := difflib.GetUnifiedDiffString(diff)
×
NEW
501

×
NEW
502
        return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier,
×
NEW
503
                diffText)
×
504
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc