• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16073856910

04 Jul 2025 12:32PM UTC coverage: 57.555%. First build
16073856910

Pull #10038

github

web-flow
Merge d6ac5738a into 47376426c
Pull Request #10038: [graph mig 3]: graph/db: migrate zombies, closed SCIDs, prune log from kvdb to SQL

0 of 229 new or added lines in 1 file covered. (0.0%)

98434 of 171026 relevant lines covered (57.55%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "database/sql"
7
        "errors"
8
        "fmt"
9
        "reflect"
10
        "sort"
11
        "time"
12

13
        "github.com/btcsuite/btcd/chaincfg/chainhash"
14
        "github.com/davecgh/go-spew/spew"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
19
        "github.com/pmezard/go-difflib/difflib"
20
)
21

22
// ErrMigrationMismatch is returned when a migrated graph record does not match
23
// the original record.
24
var ErrMigrationMismatch = fmt.Errorf("migrated graph record does not match " +
25
        "original record")
26

27
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
28
// backend.
29
//
30
// NOTE: this is currently not called from any code path. It is called via tests
31
// only for now and will be called from the main lnd binary once the
32
// migration is fully implemented and tested.
33
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
34
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
35

×
36
        log.Infof("Starting migration of the graph store from KV to SQL")
×
37
        t0 := time.Now()
×
38

×
39
        // Check if there is a graph to migrate.
×
40
        graphExists, err := checkGraphExists(kvBackend)
×
41
        if err != nil {
×
42
                return fmt.Errorf("failed to check graph existence: %w", err)
×
43
        }
×
44
        if !graphExists {
×
45
                log.Infof("No graph found in KV store, skipping the migration")
×
46
                return nil
×
47
        }
×
48

49
        // 1) Migrate all the nodes.
50
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
51
                return fmt.Errorf("could not migrate nodes: %w", err)
×
52
        }
×
53

54
        // 2) Migrate the source node.
55
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
56
                return fmt.Errorf("could not migrate source node: %w", err)
×
57
        }
×
58

59
        // 3) Migrate all the channels and channel policies.
60
        err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
×
61
        if err != nil {
×
62
                return fmt.Errorf("could not migrate channels and policies: %w",
×
63
                        err)
×
64
        }
×
65

66
        // 4) Migrate the Prune log.
NEW
67
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
NEW
68
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
69
        }
×
70

71
        // 5) Migrate the closed SCID index.
NEW
72
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
NEW
73
        if err != nil {
×
NEW
74
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
75
                        err)
×
NEW
76
        }
×
77

78
        // 6) Migrate the zombie index.
NEW
79
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
NEW
80
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
81
        }
×
82

83
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
84
                time.Since(t0))
×
85

×
86
        return nil
×
87
}
88

89
// checkGraphExists checks if the graph exists in the KV backend.
90
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
91
        // Check if there is even a graph to migrate.
×
92
        err := db.View(func(tx kvdb.RTx) error {
×
93
                // Check for the existence of the node bucket which is a top
×
94
                // level bucket that would have been created on the initial
×
95
                // creation of the graph store.
×
96
                nodes := tx.ReadBucket(nodeBucket)
×
97
                if nodes == nil {
×
98
                        return ErrGraphNotFound
×
99
                }
×
100

101
                return nil
×
102
        }, func() {})
×
103
        if errors.Is(err, ErrGraphNotFound) {
×
104
                return false, nil
×
105
        } else if err != nil {
×
106
                return false, fmt.Errorf("failed to check graph existence: %w",
×
107
                        err)
×
108
        }
×
109

110
        return true, nil
×
111
}
112

113
// migrateNodes migrates all nodes from the KV backend to the SQL database.
114
// This includes doing a sanity check after each migration to ensure that the
115
// migrated node matches the original node.
116
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
117
        sqlDB SQLQueries) error {
×
118

×
119
        // Keep track of the number of nodes migrated and the number of
×
120
        // nodes skipped due to errors.
×
121
        var (
×
122
                count   uint64
×
123
                skipped uint64
×
124
        )
×
125

×
126
        // Loop through each node in the KV store and insert it into the SQL
×
127
        // database.
×
128
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
129
                node *models.LightningNode) error {
×
130

×
131
                pub := node.PubKeyBytes
×
132

×
133
                // Sanity check to ensure that the node has valid extra opaque
×
134
                // data. If it does not, we'll skip it. We need to do this
×
135
                // because previously we would just persist any TLV bytes that
×
136
                // we received without validating them. Now, however, we
×
137
                // normalise the storage of extra opaque data, so we need to
×
138
                // ensure that the data is valid. We don't want to abort the
×
139
                // migration if we encounter a node with invalid extra opaque
×
140
                // data, so we'll just skip it and log a warning.
×
141
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
142
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
143
                        skipped++
×
144
                        log.Warnf("Skipping migration of node %x with invalid "+
×
145
                                "extra opaque data: %v", pub,
×
146
                                node.ExtraOpaqueData)
×
147

×
148
                        return nil
×
149
                } else if err != nil {
×
150
                        return fmt.Errorf("unable to marshal extra "+
×
151
                                "opaque data for node %x: %w", pub, err)
×
152
                }
×
153

154
                count++
×
155

×
156
                // Write the node to the SQL database.
×
157
                id, err := upsertNode(ctx, sqlDB, node)
×
158
                if err != nil {
×
159
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
160
                                err)
×
161
                }
×
162

163
                // Fetch it from the SQL store and compare it against the
164
                // original node object to ensure the migration was successful.
165
                dbNode, err := sqlDB.GetNodeByPubKey(
×
166
                        ctx, sqlc.GetNodeByPubKeyParams{
×
167
                                PubKey:  node.PubKeyBytes[:],
×
168
                                Version: int16(ProtocolV1),
×
169
                        },
×
170
                )
×
171
                if err != nil {
×
172
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
173
                                "after migration: %w", pub, err)
×
174
                }
×
175

176
                // Sanity check: ensure the migrated node ID matches the one we
177
                // just inserted.
178
                if dbNode.ID != id {
×
179
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
180
                                "after migration: expected %d, got %d",
×
181
                                pub, id, dbNode.ID)
×
182
                }
×
183

184
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
185
                if err != nil {
×
186
                        return fmt.Errorf("could not build migrated node "+
×
187
                                "from dbNode(db id: %d, node pub: %x): %w",
×
188
                                dbNode.ID, pub, err)
×
189
                }
×
190

191
                // Make sure that the node addresses are sorted before
192
                // comparing them to ensure that the order of addresses does
193
                // not affect the comparison.
194
                sort.Slice(node.Addresses, func(i, j int) bool {
×
195
                        return node.Addresses[i].String() <
×
196
                                node.Addresses[j].String()
×
197
                })
×
198
                sort.Slice(migratedNode.Addresses, func(i, j int) bool {
×
199
                        return migratedNode.Addresses[i].String() <
×
200
                                migratedNode.Addresses[j].String()
×
201
                })
×
202

203
                return compare(node, migratedNode, fmt.Sprintf("node %x", pub))
×
204
        })
205
        if err != nil {
×
206
                return fmt.Errorf("could not migrate nodes: %w", err)
×
207
        }
×
208

209
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
210
                "invalid TLV streams)", count, skipped)
×
211

×
212
        return nil
×
213
}
214

215
// migrateSourceNode migrates the source node from the KV backend to the
216
// SQL database.
217
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
218
        sqlDB SQLQueries) error {
×
219

×
220
        sourceNode, err := sourceNode(kvdb)
×
221
        if errors.Is(err, ErrSourceNodeNotSet) {
×
222
                // If the source node has not been set yet, we can skip this
×
223
                // migration step.
×
224
                return nil
×
225
        } else if err != nil {
×
226
                return fmt.Errorf("could not get source node from kv "+
×
227
                        "store: %w", err)
×
228
        }
×
229

230
        pub := sourceNode.PubKeyBytes
×
231

×
232
        // Get the DB ID of the source node by its public key. This node must
×
233
        // already exist in the SQL database, as it should have been migrated
×
234
        // in the previous node-migration step.
×
235
        id, err := sqlDB.GetNodeIDByPubKey(
×
236
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
237
                        PubKey:  pub[:],
×
238
                        Version: int16(ProtocolV1),
×
239
                },
×
240
        )
×
241
        if err != nil {
×
242
                return fmt.Errorf("could not get source node ID: %w", err)
×
243
        }
×
244

245
        // Now we can add the source node to the SQL database.
246
        err = sqlDB.AddSourceNode(ctx, id)
×
247
        if err != nil {
×
248
                return fmt.Errorf("could not add source node to SQL store: %w",
×
249
                        err)
×
250
        }
×
251

252
        // Verify that the source node was added correctly by fetching it back
253
        // from the SQL database and checking that the expected DB ID and
254
        // pub key are returned. We don't need to do a whole node comparison
255
        // here, as this was already done in the previous migration step.
256
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
257
        if err != nil {
×
258
                return fmt.Errorf("could not get source nodes from SQL "+
×
259
                        "store: %w", err)
×
260
        }
×
261

262
        // The SQL store has support for multiple source nodes (for future
263
        // protocol versions) but this migration is purely aimed at the V1
264
        // store, and so we expect exactly one source node to be present.
265
        if len(srcNodes) != 1 {
×
266
                return fmt.Errorf("expected exactly one source node, "+
×
267
                        "got %d", len(srcNodes))
×
268
        }
×
269

270
        // Check that the source node ID and pub key match the original
271
        // source node.
272
        if srcNodes[0].NodeID != id {
×
273
                return fmt.Errorf("source node ID mismatch after migration: "+
×
274
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
275
        }
×
276
        err = compare(pub[:], srcNodes[0].PubKey, "source node")
×
277
        if err != nil {
×
278
                return fmt.Errorf("source node pubkey mismatch after "+
×
279
                        "migration: %w", err)
×
280
        }
×
281

282
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
283

×
284
        return nil
×
285
}
286

287
// migrateChannelsAndPolicies migrates all channels and their policies
288
// from the KV backend to the SQL database.
289
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
290
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
291

×
292
        var (
×
293
                channelCount       uint64
×
294
                skippedChanCount   uint64
×
295
                policyCount        uint64
×
296
                skippedPolicyCount uint64
×
297
        )
×
298
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
299
                // If the policy is nil, we can skip it.
×
300
                if policy == nil {
×
301
                        return nil
×
302
                }
×
303

304
                // Sanity check to ensure that the policy has valid extra opaque
305
                // data. If it does not, we'll skip it. We need to do this
306
                // because previously we would just persist any TLV bytes that
307
                // we received without validating them. Now, however, we
308
                // normalise the storage of extra opaque data, so we need to
309
                // ensure that the data is valid. We don't want to abort the
310
                // migration if we encounter a policy with invalid extra opaque
311
                // data, so we'll just skip it and log a warning.
312
                _, err := marshalExtraOpaqueData(policy.ExtraOpaqueData)
×
313
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
314
                        skippedPolicyCount++
×
315
                        log.Warnf("Skipping policy for channel %d with "+
×
316
                                "invalid extra opaque data: %v",
×
317
                                policy.ChannelID, policy.ExtraOpaqueData)
×
318

×
319
                        return nil
×
320
                } else if err != nil {
×
321
                        return fmt.Errorf("unable to marshal extra opaque "+
×
322
                                "data: %w. %+v", err, policy.ExtraOpaqueData)
×
323
                }
×
324

325
                policyCount++
×
326

×
327
                _, _, _, err = updateChanEdgePolicy(ctx, sqlDB, policy)
×
328
                if err != nil {
×
329
                        return fmt.Errorf("could not migrate channel "+
×
330
                                "policy %d: %w", policy.ChannelID, err)
×
331
                }
×
332

333
                return nil
×
334
        }
335

336
        // Iterate over each channel in the KV store and migrate it and its
337
        // policies to the SQL database.
338
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
339
                policy1 *models.ChannelEdgePolicy,
×
340
                policy2 *models.ChannelEdgePolicy) error {
×
341

×
342
                scid := channel.ChannelID
×
343

×
344
                // Here, we do a sanity check to ensure that the chain hash of
×
345
                // the channel returned by the KV store matches the expected
×
346
                // chain hash. This is important since in the SQL store, we will
×
347
                // no longer explicitly store the chain hash in the channel
×
348
                // info, but rather rely on the chain hash LND is running with.
×
349
                // So this is our way of ensuring that LND is running on the
×
350
                // correct network at migration time.
×
351
                if channel.ChainHash != chain {
×
352
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
353
                                "expected %s", scid, channel.ChainHash, chain)
×
354
                }
×
355

356
                // Sanity check to ensure that the channel has valid extra
357
                // opaque data. If it does not, we'll skip it. We need to do
358
                // this because previously we would just persist any TLV bytes
359
                // that we received without validating them. Now, however, we
360
                // normalise the storage of extra opaque data, so we need to
361
                // ensure that the data is valid. We don't want to abort the
362
                // migration if we encounter a channel with invalid extra opaque
363
                // data, so we'll just skip it and log a warning.
364
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
365
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
366
                        log.Warnf("Skipping channel %d with invalid "+
×
367
                                "extra opaque data: %v", scid,
×
368
                                channel.ExtraOpaqueData)
×
369

×
370
                        skippedChanCount++
×
371

×
372
                        // If we skip a channel, we also skip its policies.
×
373
                        if policy1 != nil {
×
374
                                skippedPolicyCount++
×
375
                        }
×
376
                        if policy2 != nil {
×
377
                                skippedPolicyCount++
×
378
                        }
×
379

380
                        return nil
×
381
                } else if err != nil {
×
382
                        return fmt.Errorf("unable to marshal extra opaque "+
×
383
                                "data for channel %d: %w %v", scid, err,
×
384
                                channel.ExtraOpaqueData)
×
385
                }
×
386

387
                channelCount++
×
388
                err = migrateSingleChannel(
×
389
                        ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
×
390
                )
×
391
                if err != nil {
×
392
                        return fmt.Errorf("could not migrate channel %d: %w",
×
393
                                scid, err)
×
394
                }
×
395

396
                return nil
×
397
        })
398
        if err != nil {
×
399
                return fmt.Errorf("could not migrate channels and policies: %w",
×
400
                        err)
×
401
        }
×
402

403
        log.Infof("Migrated %d channels and %d policies from KV to SQL "+
×
404
                "(skipped %d channels and %d policies due to invalid TLV "+
×
405
                "streams)", channelCount, policyCount, skippedChanCount,
×
406
                skippedPolicyCount)
×
407

×
408
        return nil
×
409
}
410

411
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
412
        channel *models.ChannelEdgeInfo,
413
        policy1, policy2 *models.ChannelEdgePolicy,
414
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
415

×
416
        scid := channel.ChannelID
×
417

×
418
        // First, migrate the channel info along with its policies.
×
419
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
420
        if err != nil {
×
421
                return fmt.Errorf("could not insert record for channel %d "+
×
422
                        "in SQL store: %w", scid, err)
×
423
        }
×
424

425
        // Now, migrate the two channel policies.
426
        err = migChanPolicy(policy1)
×
427
        if err != nil {
×
428
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
429
                        err)
×
430
        }
×
431
        err = migChanPolicy(policy2)
×
432
        if err != nil {
×
433
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
434
                        err)
×
435
        }
×
436

437
        // Now, fetch the channel and its policies from the SQL DB.
438
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
439
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
440
                        Scid:    channelIDToBytes(scid),
×
441
                        Version: int16(ProtocolV1),
×
442
                },
×
443
        )
×
444
        if err != nil {
×
445
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
446
                        err)
×
447
        }
×
448

449
        // Assert that the DB IDs for the channel and nodes are as expected
450
        // given the inserted channel info.
451
        err = compare(dbChanInfo.channelID, row.Channel.ID, "channel DB ID")
×
452
        if err != nil {
×
453
                return err
×
454
        }
×
455
        err = compare(dbChanInfo.node1ID, row.Node.ID, "node1 DB ID")
×
456
        if err != nil {
×
457
                return err
×
458
        }
×
459
        err = compare(dbChanInfo.node2ID, row.Node_2.ID, "node2 DB ID")
×
460
        if err != nil {
×
461
                return err
×
462
        }
×
463

464
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
465
                ctx, sqlDB, row, channel.ChainHash,
×
466
        )
×
467
        if err != nil {
×
468
                return fmt.Errorf("could not build migrated channel and "+
×
469
                        "policies: %w", err)
×
470
        }
×
471

472
        // Finally, compare the original channel info and
473
        // policies with the migrated ones to ensure they match.
474
        if len(channel.ExtraOpaqueData) == 0 {
×
475
                channel.ExtraOpaqueData = nil
×
476
        }
×
477
        if len(migChan.ExtraOpaqueData) == 0 {
×
478
                migChan.ExtraOpaqueData = nil
×
479
        }
×
480

481
        err = compare(channel, migChan, fmt.Sprintf("channel %d", scid))
×
482
        if err != nil {
×
483
                return err
×
484
        }
×
485

486
        checkPolicy := func(expPolicy,
×
487
                migPolicy *models.ChannelEdgePolicy) error {
×
488

×
489
                switch {
×
490
                // Both policies are nil, nothing to compare.
491
                case expPolicy == nil && migPolicy == nil:
×
492
                        return nil
×
493

494
                // One of the policies is nil, but the other is not.
495
                case expPolicy == nil || migPolicy == nil:
×
496
                        return fmt.Errorf("expected both policies to be "+
×
497
                                "non-nil. Got expPolicy: %v, "+
×
498
                                "migPolicy: %v", expPolicy, migPolicy)
×
499

500
                // Both policies are non-nil, we can compare them.
501
                default:
×
502
                }
503

504
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
505
                        expPolicy.ExtraOpaqueData = nil
×
506
                }
×
507
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
508
                        migPolicy.ExtraOpaqueData = nil
×
509
                }
×
510

511
                return compare(*expPolicy, *migPolicy, "channel policy")
×
512
        }
513

514
        err = checkPolicy(policy1, migPol1)
×
515
        if err != nil {
×
516
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
517
                        err)
×
518
        }
×
519

520
        err = checkPolicy(policy2, migPol2)
×
521
        if err != nil {
×
522
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
523
                        err)
×
524
        }
×
525

526
        return nil
×
527
}
528

529
// migratePruneLog migrates the prune log from the KV backend to the SQL
530
// database. It iterates over each prune log entry in the KV store, inserts it
531
// into the SQL database, and then verifies that the entry was inserted
532
// correctly by fetching it back from the SQL database and comparing it to the
533
// original entry.
534
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
NEW
535
        sqlDB SQLQueries) error {
×
NEW
536

×
NEW
537
        var (
×
NEW
538
                count          uint64
×
NEW
539
                pruneTipHeight uint32
×
NEW
540
                pruneTipHash   chainhash.Hash
×
NEW
541
        )
×
NEW
542

×
NEW
543
        // migrateSinglePruneEntry is a helper function that inserts a single
×
NEW
544
        // prune log entry into the SQL database and verifies that it was
×
NEW
545
        // inserted correctly.
×
NEW
546
        migrateSinglePruneEntry := func(height uint32,
×
NEW
547
                hash *chainhash.Hash) error {
×
NEW
548

×
NEW
549
                count++
×
NEW
550

×
NEW
551
                // Keep track of the prune tip height and hash.
×
NEW
552
                if height > pruneTipHeight {
×
NEW
553
                        pruneTipHeight = height
×
NEW
554
                        pruneTipHash = *hash
×
NEW
555
                }
×
556

NEW
557
                err := sqlDB.UpsertPruneLogEntry(
×
NEW
558
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
NEW
559
                                BlockHeight: int64(height),
×
NEW
560
                                BlockHash:   hash[:],
×
NEW
561
                        },
×
NEW
562
                )
×
NEW
563
                if err != nil {
×
NEW
564
                        return fmt.Errorf("unable to insert prune log "+
×
NEW
565
                                "entry for height %d: %w", height, err)
×
NEW
566
                }
×
567

568
                // Now, check that the entry was inserted correctly.
NEW
569
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
NEW
570
                        ctx, int64(height),
×
NEW
571
                )
×
NEW
572
                if err != nil {
×
NEW
573
                        return fmt.Errorf("could not get prune hash "+
×
NEW
574
                                "for height %d: %w", height, err)
×
NEW
575
                }
×
576

NEW
577
                return compare(hash[:], migratedHash, "prune log entry")
×
578
        }
579

580
        // Iterate over each prune log entry in the KV store and migrate it to
581
        // the SQL database.
NEW
582
        err := forEachPruneLogEntry(kvBackend,
×
NEW
583
                func(height uint32, hash *chainhash.Hash) error {
×
NEW
584
                        err := migrateSinglePruneEntry(height, hash)
×
NEW
585
                        if err != nil {
×
NEW
586
                                return fmt.Errorf("could not migrate "+
×
NEW
587
                                        "prune log entry at height %d: %w",
×
NEW
588
                                        height, err)
×
NEW
589
                        }
×
590

NEW
591
                        return nil
×
592
                },
593
        )
NEW
594
        if err != nil {
×
NEW
595
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
596
        }
×
597

598
        // Check that the prune tip is set correctly in the SQL
599
        // database.
NEW
600
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
NEW
601
        if errors.Is(err, sql.ErrNoRows) {
×
NEW
602
                // The ErrGraphNeverPruned error is expected if no prune log
×
NEW
603
                // entries were migrated from the kvdb store. Otherwise, it's
×
NEW
604
                // an unexpected error.
×
NEW
605
                if count == 0 {
×
NEW
606
                        log.Infof("No prune log entries found in KV store " +
×
NEW
607
                                "to migrate")
×
NEW
608
                        return nil
×
NEW
609
                }
×
610
                // Fall-through to the next error check.
611
        }
NEW
612
        if err != nil {
×
NEW
613
                return fmt.Errorf("could not get prune tip: %w", err)
×
NEW
614
        }
×
615

NEW
616
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
NEW
617
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
NEW
618

×
NEW
619
                return fmt.Errorf("prune tip mismatch after migration: "+
×
NEW
620
                        "expected height %d, hash %s; got height %d, "+
×
NEW
621
                        "hash %s", pruneTipHeight, pruneTipHash,
×
NEW
622
                        pruneTip.BlockHeight,
×
NEW
623
                        chainhash.Hash(pruneTip.BlockHash))
×
NEW
624
        }
×
625

NEW
626
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
NEW
627
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
NEW
628
                pruneTipHash)
×
NEW
629

×
NEW
630
        return nil
×
631
}
632

633
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
634
// and policies from the given row returned by the SQL query
635
// GetChannelBySCIDWithPolicies.
636
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
637
        row sqlc.GetChannelBySCIDWithPoliciesRow,
638
        chain chainhash.Hash) (*models.ChannelEdgeInfo,
639
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
640

×
641
        node1, node2, err := buildNodeVertices(
×
642
                row.Node.PubKey, row.Node_2.PubKey,
×
643
        )
×
644
        if err != nil {
×
645
                return nil, nil, nil, err
×
646
        }
×
647

648
        edge, err := getAndBuildEdgeInfo(
×
649
                ctx, db, chain, row.Channel.ID, row.Channel, node1, node2,
×
650
        )
×
651
        if err != nil {
×
652
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
653
                        "info: %w", err)
×
654
        }
×
655

656
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
657
        if err != nil {
×
658
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
659
                        "policies: %w", err)
×
660
        }
×
661

662
        policy1, policy2, err := getAndBuildChanPolicies(
×
663
                ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
×
664
        )
×
665
        if err != nil {
×
666
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
667
                        "policies: %w", err)
×
668
        }
×
669

670
        return edge, policy1, policy2, nil
×
671
}
672

673
// forEachPruneLogEntry iterates over each prune log entry in the KV
674
// backend and calls the provided callback function for each entry.
675
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
NEW
676
        hash *chainhash.Hash) error) error {
×
NEW
677

×
NEW
678
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
679
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
NEW
680
                if metaBucket == nil {
×
NEW
681
                        return ErrGraphNotFound
×
NEW
682
                }
×
683

NEW
684
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
NEW
685
                if pruneBucket == nil {
×
NEW
686
                        // The graph has never been pruned and so, there are no
×
NEW
687
                        // entries to iterate over.
×
NEW
688
                        return nil
×
NEW
689
                }
×
690

NEW
691
                return pruneBucket.ForEach(func(k, v []byte) error {
×
NEW
692
                        blockHeight := byteOrder.Uint32(k)
×
NEW
693
                        var blockHash chainhash.Hash
×
NEW
694
                        copy(blockHash[:], v)
×
NEW
695

×
NEW
696
                        return cb(blockHeight, &blockHash)
×
NEW
697
                })
×
NEW
698
        }, func() {})
×
699
}
700

701
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
702
// the SQL database. It iterates over each closed SCID in the KV store, inserts
703
// it into the SQL database, and then verifies that the SCID was inserted
704
// correctly by checking if the channel with the given SCID is seen as closed in
705
// the SQL database.
706
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
707
        sqlDB SQLQueries) error {
×
NEW
708

×
NEW
709
        var count uint64
×
NEW
710
        err := forEachClosedSCID(kvBackend,
×
NEW
711
                func(scid lnwire.ShortChannelID) error {
×
NEW
712
                        count++
×
NEW
713

×
NEW
714
                        chanIDB := channelIDToBytes(scid.ToUint64())
×
NEW
715
                        err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
NEW
716
                        if err != nil {
×
NEW
717
                                return fmt.Errorf("could not insert closed "+
×
NEW
718
                                        "channel with SCID %s: %w", scid, err)
×
NEW
719
                        }
×
720

721
                        // Now, verify that the channel with the given SCID is
722
                        // seen as closed.
NEW
723
                        isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
724
                        if err != nil {
×
NEW
725
                                return fmt.Errorf("could not check if "+
×
NEW
726
                                        "channel %s is closed: %w", scid, err)
×
NEW
727
                        }
×
728

NEW
729
                        if !isClosed {
×
NEW
730
                                return fmt.Errorf("channel %s should be "+
×
NEW
731
                                        "closed, but is not", scid)
×
NEW
732
                        }
×
733

NEW
734
                        return nil
×
735
                },
736
        )
NEW
737
        if err != nil {
×
NEW
738
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
739
                        err)
×
NEW
740
        }
×
741

NEW
742
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
NEW
743

×
NEW
744
        return nil
×
745
}
746

747
// migrateZombieIndex migrates the zombie index from the KV backend to
748
// the SQL database. It iterates over each zombie channel in the KV store,
749
// inserts it into the SQL database, and then verifies that the channel is
750
// indeed marked as a zombie channel in the SQL database.
751
//
752
// NOTE: before inserting an entry into the zombie index, the function checks
753
// if the channel is already marked as closed in the SQL store. If it is,
754
// the entry is skipped. This means that the resulting zombie index count in
755
// the SQL store may well be less than the count of zombie channels in the KV
756
// store.
757
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
758
        sqlDB SQLQueries) error {
×
NEW
759

×
NEW
760
        var count uint64
×
NEW
761
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
NEW
762
                pubKey2 [33]byte) error {
×
NEW
763

×
NEW
764
                chanIDB := channelIDToBytes(chanID)
×
NEW
765

×
NEW
766
                // If it is in the closed SCID index, we don't need to
×
NEW
767
                // add it to the zombie index.
×
NEW
768
                //
×
NEW
769
                // NOTE: this means that the resulting zombie index count in
×
NEW
770
                // the SQL store may well be less than the count of zombie
×
NEW
771
                // channels in the KV store.
×
NEW
772
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
773
                if err != nil {
×
NEW
774
                        return fmt.Errorf("could not check closed "+
×
NEW
775
                                "channel: %w", err)
×
NEW
776
                }
×
NEW
777
                if isClosed {
×
NEW
778
                        return nil
×
NEW
779
                }
×
780

NEW
781
                count++
×
NEW
782

×
NEW
783
                err = sqlDB.UpsertZombieChannel(
×
NEW
784
                        ctx, sqlc.UpsertZombieChannelParams{
×
NEW
785
                                Version:  int16(ProtocolV1),
×
NEW
786
                                Scid:     chanIDB,
×
NEW
787
                                NodeKey1: pubKey1[:],
×
NEW
788
                                NodeKey2: pubKey2[:],
×
NEW
789
                        },
×
NEW
790
                )
×
NEW
791
                if err != nil {
×
NEW
792
                        return fmt.Errorf("could not upsert zombie "+
×
NEW
793
                                "channel %d: %w", chanID, err)
×
NEW
794
                }
×
795

796
                // Finally, verify that the channel is indeed marked as a
797
                // zombie channel.
NEW
798
                isZombie, err := sqlDB.IsZombieChannel(
×
NEW
799
                        ctx, sqlc.IsZombieChannelParams{
×
NEW
800
                                Version: int16(ProtocolV1),
×
NEW
801
                                Scid:    chanIDB,
×
NEW
802
                        },
×
NEW
803
                )
×
NEW
804
                if err != nil {
×
NEW
805
                        return fmt.Errorf("could not check if "+
×
NEW
806
                                "channel %d is zombie: %w", chanID, err)
×
NEW
807
                }
×
808

NEW
809
                if !isZombie {
×
NEW
810
                        return fmt.Errorf("channel %d should be "+
×
NEW
811
                                "a zombie, but is not", chanID)
×
NEW
812
                }
×
813

NEW
814
                return nil
×
815
        })
NEW
816
        if err != nil {
×
NEW
817
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
818
        }
×
819

NEW
820
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
NEW
821

×
NEW
822
        return nil
×
823
}
824

825
// forEachZombieEntry iterates over each zombie channel entry in the
826
// KV backend and calls the provided callback function for each entry.
827
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
NEW
828
        pubKey2 [33]byte) error) error {
×
NEW
829

×
NEW
830
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
831
                edges := tx.ReadBucket(edgeBucket)
×
NEW
832
                if edges == nil {
×
NEW
833
                        return ErrGraphNoEdgesFound
×
NEW
834
                }
×
NEW
835
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
NEW
836
                if zombieIndex == nil {
×
NEW
837
                        return nil
×
NEW
838
                }
×
839

NEW
840
                return zombieIndex.ForEach(func(k, v []byte) error {
×
NEW
841
                        var pubKey1, pubKey2 [33]byte
×
NEW
842
                        copy(pubKey1[:], v[:33])
×
NEW
843
                        copy(pubKey2[:], v[33:])
×
NEW
844

×
NEW
845
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
NEW
846
                })
×
NEW
847
        }, func() {})
×
848
}
849

850
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
851
// the provided callback function for each SCID.
852
func forEachClosedSCID(db kvdb.Backend,
NEW
853
        cb func(lnwire.ShortChannelID) error) error {
×
NEW
854

×
NEW
855
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
856
                closedScids := tx.ReadBucket(closedScidBucket)
×
NEW
857
                if closedScids == nil {
×
NEW
858
                        return nil
×
NEW
859
                }
×
860

NEW
861
                return closedScids.ForEach(func(k, _ []byte) error {
×
NEW
862
                        return cb(lnwire.NewShortChanIDFromInt(
×
NEW
863
                                byteOrder.Uint64(k),
×
NEW
864
                        ))
×
NEW
865
                })
×
NEW
866
        }, func() {})
×
867
}
868

869
// compare checks if the original and migrated objects are equal. If they
870
// are not, it returns an error with a unified diff of the two objects.
871
func compare(original, migrated any, identifier string) error {
×
872
        if reflect.DeepEqual(original, migrated) {
×
873
                return nil
×
874
        }
×
875

876
        diff := difflib.UnifiedDiff{
×
877
                A:        difflib.SplitLines(spew.Sdump(original)),
×
878
                B:        difflib.SplitLines(spew.Sdump(migrated)),
×
879
                FromFile: "Expected",
×
880
                FromDate: "",
×
881
                ToFile:   "Actual",
×
882
                ToDate:   "",
×
883
                Context:  3,
×
884
        }
×
885
        diffText, _ := difflib.GetUnifiedDiffString(diff)
×
886

×
887
        return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier,
×
888
                diffText)
×
889
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc