• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14130607387

28 Mar 2025 01:59PM UTC coverage: 69.03% (-0.006%) from 69.036%
14130607387

push

github

web-flow
Merge pull request #9647 from bhandras/sqldb-migration-base-version

sqldb: establish a base DB version even if it's not yet tracked

48 of 75 new or added lines in 3 files covered. (64.0%)

77 existing lines in 18 files now uncovered.

133408 of 193262 relevant lines covered (69.03%)

22159.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.42
/sqldb/migrations.go
1
package sqldb
2

3
import (
4
        "bytes"
5
        "context"
6
        "database/sql"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "io/fs"
11
        "net/http"
12
        "strings"
13
        "time"
14

15
        "github.com/btcsuite/btclog/v2"
16
        "github.com/golang-migrate/migrate/v4"
17
        "github.com/golang-migrate/migrate/v4/database"
18
        "github.com/golang-migrate/migrate/v4/source/httpfs"
19
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
20
)
21

22
var (
23
        // migrationConfig defines a list of migrations to be applied to the
24
        // database. Each migration is assigned a version number, determining
25
        // its execution order.
26
        // The schema version, tracked by golang-migrate, ensures migrations are
27
        // applied to the correct schema. For migrations involving only schema
28
        // changes, the migration function can be left nil. For custom
29
        // migrations an implemented migration function is required.
30
        //
31
        // NOTE: The migration function may have runtime dependencies, which
32
        // must be injected during runtime.
33
        migrationConfig = []MigrationConfig{
34
                {
35
                        Name:          "000001_invoices",
36
                        Version:       1,
37
                        SchemaVersion: 1,
38
                },
39
                {
40
                        Name:          "000002_amp_invoices",
41
                        Version:       2,
42
                        SchemaVersion: 2,
43
                },
44
                {
45
                        Name:          "000003_invoice_events",
46
                        Version:       3,
47
                        SchemaVersion: 3,
48
                },
49
                {
50
                        Name:          "000004_invoice_expiry_fix",
51
                        Version:       4,
52
                        SchemaVersion: 4,
53
                },
54
                {
55
                        Name:          "000005_migration_tracker",
56
                        Version:       5,
57
                        SchemaVersion: 5,
58
                },
59
                {
60
                        Name:          "000006_invoice_migration",
61
                        Version:       6,
62
                        SchemaVersion: 6,
63
                },
64
                {
65
                        Name:          "kv_invoice_migration",
66
                        Version:       7,
67
                        SchemaVersion: 6,
68
                        // A migration function is may be attached to this
69
                        // migration to migrate KV invoices to the native SQL
70
                        // schema. This is optional and can be disabled by the
71
                        // user if necessary.
72
                },
73
        }
74
)
75

76
// MigrationConfig is a configuration struct that describes SQL migrations. Each
77
// migration is associated with a specific schema version and a global database
78
// version. Migrations are applied in the order of their global database
79
// version. If a migration includes a non-nil MigrationFn, it is executed after
80
// the SQL schema has been migrated to the corresponding schema version.
81
type MigrationConfig struct {
82
        // Name is the name of the migration.
83
        Name string
84

85
        // Version represents the "global" database version for this migration.
86
        // Unlike the schema version tracked by golang-migrate, it encompasses
87
        // all migrations, including those managed by golang-migrate as well
88
        // as custom in-code migrations.
89
        Version int
90

91
        // SchemaVersion represents the schema version tracked by golang-migrate
92
        // at which the migration is applied.
93
        SchemaVersion int
94

95
        // MigrationFn is the function executed for custom migrations at the
96
        // specified version. It is used to handle migrations that cannot be
97
        // performed through SQL alone. If set to nil, no custom migration is
98
        // applied.
99
        MigrationFn func(tx *sqlc.Queries) error
100
}
101

102
// MigrationTarget is a functional option that can be passed to applyMigrations
103
// to specify a target version to migrate to.
104
type MigrationTarget func(mig *migrate.Migrate) error
105

106
// MigrationExecutor is an interface that abstracts the migration functionality.
107
type MigrationExecutor interface {
108
        // ExecuteMigrations runs database migrations up to the specified target
109
        // version or all migrations if no target is specified. A migration may
110
        // include a schema change, a custom migration function, or both.
111
        // Developers must ensure that migrations are defined in the correct
112
        // order. Migration details are stored in the global variable
113
        // migrationConfig.
114
        ExecuteMigrations(target MigrationTarget) error
115

116
        // GetSchemaVersion returns the current schema version of the database.
117
        GetSchemaVersion() (int, bool, error)
118

119
        // SetSchemaVersion sets the schema version of the database.
120
        //
121
        // NOTE: This alters the internal database schema tracker. USE WITH
122
        // CAUTION!!!
123
        SetSchemaVersion(version int, dirty bool) error
124
}
125

126
var (
127
        // TargetLatest is a MigrationTarget that migrates to the latest
128
        // version available.
129
        TargetLatest = func(mig *migrate.Migrate) error {
×
130
                return mig.Up()
×
131
        }
×
132

133
        // TargetVersion is a MigrationTarget that migrates to the given
134
        // version.
135
        TargetVersion = func(version uint) MigrationTarget {
3,668✔
136
                return func(mig *migrate.Migrate) error {
7,336✔
137
                        return mig.Migrate(version)
3,668✔
138
                }
3,668✔
139
        }
140
)
141

142
// GetMigrations returns a copy of the migration configuration.
143
func GetMigrations() []MigrationConfig {
519✔
144
        migrations := make([]MigrationConfig, len(migrationConfig))
519✔
145
        copy(migrations, migrationConfig)
519✔
146

519✔
147
        return migrations
519✔
148
}
519✔
149

150
// migrationLogger is a logger that wraps the passed btclog.Logger so it can be
151
// used to log migrations.
152
type migrationLogger struct {
153
        log btclog.Logger
154
}
155

156
// Printf is like fmt.Printf. We map this to the target logger based on the
157
// current log level.
158
func (m *migrationLogger) Printf(format string, v ...interface{}) {
3,144✔
159
        // Trim trailing newlines from the format.
3,144✔
160
        format = strings.TrimRight(format, "\n")
3,144✔
161

3,144✔
162
        switch m.log.Level() {
3,144✔
163
        case btclog.LevelTrace:
×
164
                m.log.Tracef(format, v...)
×
165
        case btclog.LevelDebug:
×
166
                m.log.Debugf(format, v...)
×
167
        case btclog.LevelInfo:
×
168
                m.log.Infof(format, v...)
×
169
        case btclog.LevelWarn:
×
170
                m.log.Warnf(format, v...)
×
171
        case btclog.LevelError:
×
172
                m.log.Errorf(format, v...)
×
173
        case btclog.LevelCritical:
×
174
                m.log.Criticalf(format, v...)
×
175
        case btclog.LevelOff:
3,144✔
176
        }
177
}
178

179
// Verbose should return true when verbose logging output is wanted
180
func (m *migrationLogger) Verbose() bool {
9,430✔
181
        return m.log.Level() <= btclog.LevelDebug
9,430✔
182
}
9,430✔
183

184
// applyMigrations executes all database migration files found in the given file
185
// system under the given path, using the passed database driver and database
186
// name.
187
func applyMigrations(fs fs.FS, driver database.Driver, path,
188
        dbName string, targetVersion MigrationTarget) error {
3,668✔
189

3,668✔
190
        // With the migrate instance open, we'll create a new migration source
3,668✔
191
        // using the embedded file system stored in sqlSchemas. The library
3,668✔
192
        // we're using can't handle a raw file system interface, so we wrap it
3,668✔
193
        // in this intermediate layer.
3,668✔
194
        migrateFileServer, err := httpfs.New(http.FS(fs), path)
3,668✔
195
        if err != nil {
3,668✔
196
                return err
×
197
        }
×
198

199
        // Finally, we'll run the migration with our driver above based on the
200
        // open DB, and also the migration source stored in the file system
201
        // above.
202
        sqlMigrate, err := migrate.NewWithInstance(
3,668✔
203
                "migrations", migrateFileServer, dbName, driver,
3,668✔
204
        )
3,668✔
205
        if err != nil {
3,668✔
206
                return err
×
207
        }
×
208

209
        migrationVersion, _, err := sqlMigrate.Version()
3,668✔
210
        if err != nil && !errors.Is(err, migrate.ErrNilVersion) {
3,668✔
211
                log.Errorf("Unable to determine current migration version: %v",
×
212
                        err)
×
213

×
214
                return err
×
215
        }
×
216

217
        log.Infof("Applying migrations from version=%v", migrationVersion)
3,668✔
218

3,668✔
219
        // Apply our local logger to the migration instance.
3,668✔
220
        sqlMigrate.Log = &migrationLogger{log}
3,668✔
221

3,668✔
222
        // Execute the migration based on the target given.
3,668✔
223
        err = targetVersion(sqlMigrate)
3,668✔
224
        if err != nil && !errors.Is(err, migrate.ErrNoChange) {
3,668✔
225
                return err
×
226
        }
×
227

228
        return nil
3,668✔
229
}
230

231
// replacerFS is an implementation of a fs.FS virtual file system that wraps an
232
// existing file system but does a search-and-replace operation on each file
233
// when it is opened.
234
type replacerFS struct {
235
        parentFS fs.FS
236
        replaces map[string]string
237
}
238

239
// A compile-time assertion to make sure replacerFS implements the fs.FS
240
// interface.
241
var _ fs.FS = (*replacerFS)(nil)
242

243
// newReplacerFS creates a new replacer file system, wrapping the given parent
244
// virtual file system. Each file within the file system is undergoing a
245
// search-and-replace operation when it is opened, using the given map where the
246
// key denotes the search term and the value the term to replace each occurrence
247
// with.
248
func newReplacerFS(parent fs.FS, replaces map[string]string) *replacerFS {
3,668✔
249
        return &replacerFS{
3,668✔
250
                parentFS: parent,
3,668✔
251
                replaces: replaces,
3,668✔
252
        }
3,668✔
253
}
3,668✔
254

255
// Open opens a file in the virtual file system.
256
//
257
// NOTE: This is part of the fs.FS interface.
258
func (t *replacerFS) Open(name string) (fs.File, error) {
13,618✔
259
        f, err := t.parentFS.Open(name)
13,618✔
260
        if err != nil {
13,618✔
261
                return nil, err
×
262
        }
×
263

264
        stat, err := f.Stat()
13,618✔
265
        if err != nil {
13,618✔
266
                return nil, err
×
267
        }
×
268

269
        if stat.IsDir() {
17,286✔
270
                return f, err
3,668✔
271
        }
3,668✔
272

273
        return newReplacerFile(f, t.replaces)
9,950✔
274
}
275

276
type replacerFile struct {
277
        parentFile fs.File
278
        buf        bytes.Buffer
279
}
280

281
// A compile-time assertion to make sure replacerFile implements the fs.File
282
// interface.
283
var _ fs.File = (*replacerFile)(nil)
284

285
func newReplacerFile(parent fs.File, replaces map[string]string) (*replacerFile,
286
        error) {
9,950✔
287

9,950✔
288
        content, err := io.ReadAll(parent)
9,950✔
289
        if err != nil {
9,950✔
290
                return nil, err
×
291
        }
×
292

293
        contentStr := string(content)
9,950✔
294
        for from, to := range replaces {
24,875✔
295
                contentStr = strings.ReplaceAll(contentStr, from, to)
14,925✔
296
        }
14,925✔
297

298
        var buf bytes.Buffer
9,950✔
299
        _, err = buf.WriteString(contentStr)
9,950✔
300
        if err != nil {
9,950✔
301
                return nil, err
×
302
        }
×
303

304
        return &replacerFile{
9,950✔
305
                parentFile: parent,
9,950✔
306
                buf:        buf,
9,950✔
307
        }, nil
9,950✔
308
}
309

310
// Stat returns statistics/info about the file.
311
//
312
// NOTE: This is part of the fs.File interface.
313
func (t *replacerFile) Stat() (fs.FileInfo, error) {
×
314
        return t.parentFile.Stat()
×
315
}
×
316

317
// Read reads as many bytes as possible from the file into the given slice.
318
//
319
// NOTE: This is part of the fs.File interface.
320
func (t *replacerFile) Read(bytes []byte) (int, error) {
9,426✔
321
        return t.buf.Read(bytes)
9,426✔
322
}
9,426✔
323

324
// Close closes the underlying file.
325
//
326
// NOTE: This is part of the fs.File interface.
327
func (t *replacerFile) Close() error {
9,950✔
328
        // We already fully read and then closed the file when creating this
9,950✔
329
        // instance, so there's nothing to do for us here.
9,950✔
330
        return nil
9,950✔
331
}
9,950✔
332

333
// MigrationTxOptions is the implementation of the TxOptions interface for
334
// migration transactions.
335
type MigrationTxOptions struct {
336
}
337

338
// ReadOnly returns false to indicate that migration transactions are not read
339
// only.
340
func (m *MigrationTxOptions) ReadOnly() bool {
3,668✔
341
        return false
3,668✔
342
}
3,668✔
343

344
// ApplyMigrations applies the provided migrations to the database in sequence.
345
// It ensures migrations are executed in the correct order, applying both custom
346
// migration functions and SQL migrations as needed.
347
func ApplyMigrations(ctx context.Context, db *BaseDB,
348
        migrator MigrationExecutor, migrations []MigrationConfig) error {
546✔
349

546✔
350
        // Ensure that the migrations are sorted by version.
546✔
351
        for i := 0; i < len(migrations); i++ {
4,260✔
352
                if migrations[i].Version != i+1 {
3,720✔
353
                        return fmt.Errorf("migration version %d is out of "+
6✔
354
                                "order. Expected %d", migrations[i].Version,
6✔
355
                                i+1)
6✔
356
                }
6✔
357
        }
358
        // Construct a transaction executor to apply custom migrations.
359
        executor := NewTransactionExecutor(db, func(tx *sql.Tx) *sqlc.Queries {
4,204✔
360
                return db.WithTx(tx)
3,664✔
361
        })
3,664✔
362

363
        currentVersion := 0
540✔
364
        version, err := db.GetDatabaseVersion(ctx)
540✔
365
        if !errors.Is(err, sql.ErrNoRows) {
554✔
366
                if err != nil {
14✔
367
                        return fmt.Errorf("error getting current database "+
×
368
                                "version: %w", err)
×
369
                }
×
370

371
                currentVersion = int(version)
14✔
372
        } else {
526✔
373
                // Since we don't have a version tracked by our own table yet,
526✔
374
                // we'll use the schema version reported by sqlc to determine
526✔
375
                // the current version.
526✔
376
                //
526✔
377
                // NOTE: This is safe because the first in-code migration was
526✔
378
                // introduced in version 7. This is only possible if the user
526✔
379
                // has a schema version <= 4.
526✔
380
                var dirty bool
526✔
381
                currentVersion, dirty, err = migrator.GetSchemaVersion()
526✔
382
                if err != nil {
526✔
NEW
383
                        return err
×
NEW
384
                }
×
385

386
                log.Infof("No database version found, using schema version %d "+
526✔
387
                        "(dirty=%v) as base version", currentVersion, dirty)
526✔
388
        }
389

390
        // Due to an a migration issue in v0.19.0-rc1 we may be at version 2 and
391
        // have a dirty schema due to failing migration 3. If this is indeed the
392
        // case, we need to reset the dirty flag to be able to apply the fixed
393
        // migration.
394
        // NOTE: this could be removed as soon as we drop v0.19.0-beta.
395
        if version == 2 {
542✔
396
                schemaVersion, dirty, err := migrator.GetSchemaVersion()
2✔
397
                if err != nil {
2✔
NEW
398
                        return err
×
NEW
399
                }
×
400

401
                if schemaVersion == 3 && dirty {
4✔
402
                        log.Warnf("Schema version %d is dirty. This is "+
2✔
403
                                "likely a consequence of a failed migration "+
2✔
404
                                "in v0.19.0-rc1. Attempting to recover by "+
2✔
405
                                "resetting the dirty flag", schemaVersion)
2✔
406

2✔
407
                        err = migrator.SetSchemaVersion(4, false)
2✔
408
                        if err != nil {
2✔
NEW
409
                                return err
×
NEW
410
                        }
×
411
                }
412
        }
413

414
        for _, migration := range migrations {
4,248✔
415
                if migration.Version <= currentVersion {
3,752✔
416
                        log.Infof("Skipping migration '%s' (version %d) as it "+
44✔
417
                                "has already been applied", migration.Name,
44✔
418
                                migration.Version)
44✔
419

44✔
420
                        continue
44✔
421
                }
422

423
                log.Infof("Migrating SQL schema to version %d",
3,664✔
424
                        migration.SchemaVersion)
3,664✔
425

3,664✔
426
                // Execute SQL schema migrations up to the target version.
3,664✔
427
                err = migrator.ExecuteMigrations(
3,664✔
428
                        TargetVersion(uint(migration.SchemaVersion)),
3,664✔
429
                )
3,664✔
430
                if err != nil {
3,664✔
431
                        return fmt.Errorf("error executing schema migrations "+
×
432
                                "to target version %d: %w",
×
433
                                migration.SchemaVersion, err)
×
434
                }
×
435

436
                var opts MigrationTxOptions
3,664✔
437

3,664✔
438
                // Run the custom migration as a transaction to ensure
3,664✔
439
                // atomicity. If successful, mark the migration as complete in
3,664✔
440
                // the migration tracker table.
3,664✔
441
                err = executor.ExecTx(ctx, &opts, func(tx *sqlc.Queries) error {
7,328✔
442
                        // Apply the migration function if one is provided.
3,664✔
443
                        if migration.MigrationFn != nil {
3,690✔
444
                                log.Infof("Applying custom migration '%v' "+
26✔
445
                                        "(version %d) to schema version %d",
26✔
446
                                        migration.Name, migration.Version,
26✔
447
                                        migration.SchemaVersion)
26✔
448

26✔
449
                                err = migration.MigrationFn(tx)
26✔
450
                                if err != nil {
32✔
451
                                        return fmt.Errorf("error applying "+
6✔
452
                                                "migration '%v' (version %d) "+
6✔
453
                                                "to schema version %d: %w",
6✔
454
                                                migration.Name,
6✔
455
                                                migration.Version,
6✔
456
                                                migration.SchemaVersion, err)
6✔
457
                                }
6✔
458

459
                                log.Infof("Migration '%v' (version %d) "+
20✔
460
                                        "applied ", migration.Name,
20✔
461
                                        migration.Version)
20✔
462
                        }
463

464
                        // Mark the migration as complete by adding the version
465
                        // to the migration tracker table along with the current
466
                        // timestamp.
467
                        err = tx.SetMigration(ctx, sqlc.SetMigrationParams{
3,658✔
468
                                Version:       int32(migration.Version),
3,658✔
469
                                MigrationTime: time.Now(),
3,658✔
470
                        })
3,658✔
471
                        if err != nil {
3,658✔
472
                                return fmt.Errorf("error setting migration "+
×
473
                                        "version %d: %w", migration.Version,
×
474
                                        err)
×
475
                        }
×
476

477
                        return nil
3,658✔
478
                }, func() {})
3,664✔
479
                if err != nil {
3,670✔
480
                        return err
6✔
481
                }
6✔
482
        }
483

484
        return nil
534✔
485
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc