• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16110323433

07 Jul 2025 07:10AM UTC coverage: 57.575%. First build
16110323433

Pull #10038

github

web-flow
Merge c1752330c into b067f7db7
Pull Request #10038: [graph mig 3]: graph/db: migrate zombies, closed SCIDs, prune log from kvdb to SQL

0 of 229 new or added lines in 1 file covered. (0.0%)

98477 of 171041 relevant lines covered (57.58%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "database/sql"
7
        "errors"
8
        "fmt"
9
        "reflect"
10
        "sort"
11
        "time"
12

13
        "github.com/btcsuite/btcd/chaincfg/chainhash"
14
        "github.com/davecgh/go-spew/spew"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
19
        "github.com/pmezard/go-difflib/difflib"
20
)
21

22
// ErrMigrationMismatch is returned when a migrated graph record does not match
23
// the original record.
24
var ErrMigrationMismatch = fmt.Errorf("migrated graph record does not match " +
25
        "original record")
26

27
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
28
// backend.
29
//
30
// NOTE: this is currently not called from any code path. It is called via tests
31
// only for now and will be called from the main lnd binary once the
32
// migration is fully implemented and tested.
33
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
34
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
35

×
36
        log.Infof("Starting migration of the graph store from KV to SQL")
×
37
        t0 := time.Now()
×
38

×
39
        // Check if there is a graph to migrate.
×
40
        graphExists, err := checkGraphExists(kvBackend)
×
41
        if err != nil {
×
42
                return fmt.Errorf("failed to check graph existence: %w", err)
×
43
        }
×
44
        if !graphExists {
×
45
                log.Infof("No graph found in KV store, skipping the migration")
×
46
                return nil
×
47
        }
×
48

49
        // 1) Migrate all the nodes.
50
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
51
                return fmt.Errorf("could not migrate nodes: %w", err)
×
52
        }
×
53

54
        // 2) Migrate the source node.
55
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
56
                return fmt.Errorf("could not migrate source node: %w", err)
×
57
        }
×
58

59
        // 3) Migrate all the channels and channel policies.
60
        err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
×
61
        if err != nil {
×
62
                return fmt.Errorf("could not migrate channels and policies: %w",
×
63
                        err)
×
64
        }
×
65

66
        // 4) Migrate the Prune log.
NEW
67
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
NEW
68
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
69
        }
×
70

71
        // 5) Migrate the closed SCID index.
NEW
72
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
NEW
73
        if err != nil {
×
NEW
74
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
75
                        err)
×
NEW
76
        }
×
77

78
        // 6) Migrate the zombie index.
NEW
79
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
NEW
80
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
81
        }
×
82

83
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
84
                time.Since(t0))
×
85

×
86
        return nil
×
87
}
88

89
// checkGraphExists checks if the graph exists in the KV backend.
90
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
91
        // Check if there is even a graph to migrate.
×
92
        err := db.View(func(tx kvdb.RTx) error {
×
93
                // Check for the existence of the node bucket which is a top
×
94
                // level bucket that would have been created on the initial
×
95
                // creation of the graph store.
×
96
                nodes := tx.ReadBucket(nodeBucket)
×
97
                if nodes == nil {
×
98
                        return ErrGraphNotFound
×
99
                }
×
100

101
                return nil
×
102
        }, func() {})
×
103
        if errors.Is(err, ErrGraphNotFound) {
×
104
                return false, nil
×
105
        } else if err != nil {
×
106
                return false, fmt.Errorf("failed to check graph existence: %w",
×
107
                        err)
×
108
        }
×
109

110
        return true, nil
×
111
}
112

113
// migrateNodes migrates all nodes from the KV backend to the SQL database.
114
// This includes doing a sanity check after each migration to ensure that the
115
// migrated node matches the original node.
116
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
117
        sqlDB SQLQueries) error {
×
118

×
119
        // Keep track of the number of nodes migrated and the number of
×
120
        // nodes skipped due to errors.
×
121
        var (
×
122
                count   uint64
×
123
                skipped uint64
×
124
        )
×
125

×
126
        // Loop through each node in the KV store and insert it into the SQL
×
127
        // database.
×
128
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
129
                node *models.LightningNode) error {
×
130

×
131
                pub := node.PubKeyBytes
×
132

×
133
                // Sanity check to ensure that the node has valid extra opaque
×
134
                // data. If it does not, we'll skip it. We need to do this
×
135
                // because previously we would just persist any TLV bytes that
×
136
                // we received without validating them. Now, however, we
×
137
                // normalise the storage of extra opaque data, so we need to
×
138
                // ensure that the data is valid. We don't want to abort the
×
139
                // migration if we encounter a node with invalid extra opaque
×
140
                // data, so we'll just skip it and log a warning.
×
141
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
142
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
143
                        skipped++
×
144
                        log.Warnf("Skipping migration of node %x with invalid "+
×
145
                                "extra opaque data: %v", pub,
×
146
                                node.ExtraOpaqueData)
×
147

×
148
                        return nil
×
149
                } else if err != nil {
×
150
                        return fmt.Errorf("unable to marshal extra "+
×
151
                                "opaque data for node %x: %w", pub, err)
×
152
                }
×
153

154
                count++
×
155

×
156
                // TODO(elle): At this point, we should check the loaded node
×
157
                // to see if we should extract any DNS addresses from its
×
158
                // opaque type addresses. This is expected to be done in:
×
159
                // https://github.com/lightningnetwork/lnd/pull/9455.
×
160
                // This TODO is being tracked in
×
161
                //  https://github.com/lightningnetwork/lnd/issues/9795 as this
×
162
                // must be addressed before making this code path active in
×
163
                // production.
×
164

×
165
                // Write the node to the SQL database.
×
166
                id, err := upsertNode(ctx, sqlDB, node)
×
167
                if err != nil {
×
168
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
169
                                err)
×
170
                }
×
171

172
                // Fetch it from the SQL store and compare it against the
173
                // original node object to ensure the migration was successful.
174
                dbNode, err := sqlDB.GetNodeByPubKey(
×
175
                        ctx, sqlc.GetNodeByPubKeyParams{
×
176
                                PubKey:  node.PubKeyBytes[:],
×
177
                                Version: int16(ProtocolV1),
×
178
                        },
×
179
                )
×
180
                if err != nil {
×
181
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
182
                                "after migration: %w", pub, err)
×
183
                }
×
184

185
                // Sanity check: ensure the migrated node ID matches the one we
186
                // just inserted.
187
                if dbNode.ID != id {
×
188
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
189
                                "after migration: expected %d, got %d",
×
190
                                pub, id, dbNode.ID)
×
191
                }
×
192

193
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
194
                if err != nil {
×
195
                        return fmt.Errorf("could not build migrated node "+
×
196
                                "from dbNode(db id: %d, node pub: %x): %w",
×
197
                                dbNode.ID, pub, err)
×
198
                }
×
199

200
                // Make sure that the node addresses are sorted before
201
                // comparing them to ensure that the order of addresses does
202
                // not affect the comparison.
203
                sort.Slice(node.Addresses, func(i, j int) bool {
×
204
                        return node.Addresses[i].String() <
×
205
                                node.Addresses[j].String()
×
206
                })
×
207
                sort.Slice(migratedNode.Addresses, func(i, j int) bool {
×
208
                        return migratedNode.Addresses[i].String() <
×
209
                                migratedNode.Addresses[j].String()
×
210
                })
×
211

212
                return compare(node, migratedNode, fmt.Sprintf("node %x", pub))
×
213
        })
214
        if err != nil {
×
215
                return fmt.Errorf("could not migrate nodes: %w", err)
×
216
        }
×
217

218
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
219
                "invalid TLV streams)", count, skipped)
×
220

×
221
        return nil
×
222
}
223

224
// migrateSourceNode migrates the source node from the KV backend to the
225
// SQL database.
226
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
227
        sqlDB SQLQueries) error {
×
228

×
229
        sourceNode, err := sourceNode(kvdb)
×
230
        if errors.Is(err, ErrSourceNodeNotSet) {
×
231
                // If the source node has not been set yet, we can skip this
×
232
                // migration step.
×
233
                return nil
×
234
        } else if err != nil {
×
235
                return fmt.Errorf("could not get source node from kv "+
×
236
                        "store: %w", err)
×
237
        }
×
238

239
        pub := sourceNode.PubKeyBytes
×
240

×
241
        // Get the DB ID of the source node by its public key. This node must
×
242
        // already exist in the SQL database, as it should have been migrated
×
243
        // in the previous node-migration step.
×
244
        id, err := sqlDB.GetNodeIDByPubKey(
×
245
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
246
                        PubKey:  pub[:],
×
247
                        Version: int16(ProtocolV1),
×
248
                },
×
249
        )
×
250
        if err != nil {
×
251
                return fmt.Errorf("could not get source node ID: %w", err)
×
252
        }
×
253

254
        // Now we can add the source node to the SQL database.
255
        err = sqlDB.AddSourceNode(ctx, id)
×
256
        if err != nil {
×
257
                return fmt.Errorf("could not add source node to SQL store: %w",
×
258
                        err)
×
259
        }
×
260

261
        // Verify that the source node was added correctly by fetching it back
262
        // from the SQL database and checking that the expected DB ID and
263
        // pub key are returned. We don't need to do a whole node comparison
264
        // here, as this was already done in the previous migration step.
265
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
266
        if err != nil {
×
267
                return fmt.Errorf("could not get source nodes from SQL "+
×
268
                        "store: %w", err)
×
269
        }
×
270

271
        // The SQL store has support for multiple source nodes (for future
272
        // protocol versions) but this migration is purely aimed at the V1
273
        // store, and so we expect exactly one source node to be present.
274
        if len(srcNodes) != 1 {
×
275
                return fmt.Errorf("expected exactly one source node, "+
×
276
                        "got %d", len(srcNodes))
×
277
        }
×
278

279
        // Check that the source node ID and pub key match the original
280
        // source node.
281
        if srcNodes[0].NodeID != id {
×
282
                return fmt.Errorf("source node ID mismatch after migration: "+
×
283
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
284
        }
×
285
        err = compare(pub[:], srcNodes[0].PubKey, "source node")
×
286
        if err != nil {
×
287
                return fmt.Errorf("source node pubkey mismatch after "+
×
288
                        "migration: %w", err)
×
289
        }
×
290

291
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
292

×
293
        return nil
×
294
}
295

296
// migrateChannelsAndPolicies migrates all channels and their policies
297
// from the KV backend to the SQL database.
298
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
299
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
300

×
301
        var (
×
302
                channelCount       uint64
×
303
                skippedChanCount   uint64
×
304
                policyCount        uint64
×
305
                skippedPolicyCount uint64
×
306
        )
×
307
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
308
                // If the policy is nil, we can skip it.
×
309
                if policy == nil {
×
310
                        return nil
×
311
                }
×
312

313
                // Sanity check to ensure that the policy has valid extra opaque
314
                // data. If it does not, we'll skip it. We need to do this
315
                // because previously we would just persist any TLV bytes that
316
                // we received without validating them. Now, however, we
317
                // normalise the storage of extra opaque data, so we need to
318
                // ensure that the data is valid. We don't want to abort the
319
                // migration if we encounter a policy with invalid extra opaque
320
                // data, so we'll just skip it and log a warning.
321
                _, err := marshalExtraOpaqueData(policy.ExtraOpaqueData)
×
322
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
323
                        skippedPolicyCount++
×
324
                        log.Warnf("Skipping policy for channel %d with "+
×
325
                                "invalid extra opaque data: %v",
×
326
                                policy.ChannelID, policy.ExtraOpaqueData)
×
327

×
328
                        return nil
×
329
                } else if err != nil {
×
330
                        return fmt.Errorf("unable to marshal extra opaque "+
×
331
                                "data: %w. %+v", err, policy.ExtraOpaqueData)
×
332
                }
×
333

334
                policyCount++
×
335

×
336
                _, _, _, err = updateChanEdgePolicy(ctx, sqlDB, policy)
×
337
                if err != nil {
×
338
                        return fmt.Errorf("could not migrate channel "+
×
339
                                "policy %d: %w", policy.ChannelID, err)
×
340
                }
×
341

342
                return nil
×
343
        }
344

345
        // Iterate over each channel in the KV store and migrate it and its
346
        // policies to the SQL database.
347
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
348
                policy1 *models.ChannelEdgePolicy,
×
349
                policy2 *models.ChannelEdgePolicy) error {
×
350

×
351
                scid := channel.ChannelID
×
352

×
353
                // Here, we do a sanity check to ensure that the chain hash of
×
354
                // the channel returned by the KV store matches the expected
×
355
                // chain hash. This is important since in the SQL store, we will
×
356
                // no longer explicitly store the chain hash in the channel
×
357
                // info, but rather rely on the chain hash LND is running with.
×
358
                // So this is our way of ensuring that LND is running on the
×
359
                // correct network at migration time.
×
360
                if channel.ChainHash != chain {
×
361
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
362
                                "expected %s", scid, channel.ChainHash, chain)
×
363
                }
×
364

365
                // Sanity check to ensure that the channel has valid extra
366
                // opaque data. If it does not, we'll skip it. We need to do
367
                // this because previously we would just persist any TLV bytes
368
                // that we received without validating them. Now, however, we
369
                // normalise the storage of extra opaque data, so we need to
370
                // ensure that the data is valid. We don't want to abort the
371
                // migration if we encounter a channel with invalid extra opaque
372
                // data, so we'll just skip it and log a warning.
373
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
374
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
375
                        log.Warnf("Skipping channel %d with invalid "+
×
376
                                "extra opaque data: %v", scid,
×
377
                                channel.ExtraOpaqueData)
×
378

×
379
                        skippedChanCount++
×
380

×
381
                        // If we skip a channel, we also skip its policies.
×
382
                        if policy1 != nil {
×
383
                                skippedPolicyCount++
×
384
                        }
×
385
                        if policy2 != nil {
×
386
                                skippedPolicyCount++
×
387
                        }
×
388

389
                        return nil
×
390
                } else if err != nil {
×
391
                        return fmt.Errorf("unable to marshal extra opaque "+
×
392
                                "data for channel %d: %w %v", scid, err,
×
393
                                channel.ExtraOpaqueData)
×
394
                }
×
395

396
                channelCount++
×
397
                err = migrateSingleChannel(
×
398
                        ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
×
399
                )
×
400
                if err != nil {
×
401
                        return fmt.Errorf("could not migrate channel %d: %w",
×
402
                                scid, err)
×
403
                }
×
404

405
                return nil
×
406
        })
407
        if err != nil {
×
408
                return fmt.Errorf("could not migrate channels and policies: %w",
×
409
                        err)
×
410
        }
×
411

412
        log.Infof("Migrated %d channels and %d policies from KV to SQL "+
×
413
                "(skipped %d channels and %d policies due to invalid TLV "+
×
414
                "streams)", channelCount, policyCount, skippedChanCount,
×
415
                skippedPolicyCount)
×
416

×
417
        return nil
×
418
}
419

420
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
421
        channel *models.ChannelEdgeInfo,
422
        policy1, policy2 *models.ChannelEdgePolicy,
423
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
424

×
425
        scid := channel.ChannelID
×
426

×
427
        // First, migrate the channel info along with its policies.
×
428
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
429
        if err != nil {
×
430
                return fmt.Errorf("could not insert record for channel %d "+
×
431
                        "in SQL store: %w", scid, err)
×
432
        }
×
433

434
        // Now, migrate the two channel policies.
435
        err = migChanPolicy(policy1)
×
436
        if err != nil {
×
437
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
438
                        err)
×
439
        }
×
440
        err = migChanPolicy(policy2)
×
441
        if err != nil {
×
442
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
443
                        err)
×
444
        }
×
445

446
        // Now, fetch the channel and its policies from the SQL DB.
447
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
448
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
449
                        Scid:    channelIDToBytes(scid),
×
450
                        Version: int16(ProtocolV1),
×
451
                },
×
452
        )
×
453
        if err != nil {
×
454
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
455
                        err)
×
456
        }
×
457

458
        // Assert that the DB IDs for the channel and nodes are as expected
459
        // given the inserted channel info.
460
        err = compare(dbChanInfo.channelID, row.Channel.ID, "channel DB ID")
×
461
        if err != nil {
×
462
                return err
×
463
        }
×
464
        err = compare(dbChanInfo.node1ID, row.Node.ID, "node1 DB ID")
×
465
        if err != nil {
×
466
                return err
×
467
        }
×
468
        err = compare(dbChanInfo.node2ID, row.Node_2.ID, "node2 DB ID")
×
469
        if err != nil {
×
470
                return err
×
471
        }
×
472

473
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
474
                ctx, sqlDB, row, channel.ChainHash,
×
475
        )
×
476
        if err != nil {
×
477
                return fmt.Errorf("could not build migrated channel and "+
×
478
                        "policies: %w", err)
×
479
        }
×
480

481
        // Finally, compare the original channel info and
482
        // policies with the migrated ones to ensure they match.
483
        if len(channel.ExtraOpaqueData) == 0 {
×
484
                channel.ExtraOpaqueData = nil
×
485
        }
×
486
        if len(migChan.ExtraOpaqueData) == 0 {
×
487
                migChan.ExtraOpaqueData = nil
×
488
        }
×
489

490
        err = compare(channel, migChan, fmt.Sprintf("channel %d", scid))
×
491
        if err != nil {
×
492
                return err
×
493
        }
×
494

495
        checkPolicy := func(expPolicy,
×
496
                migPolicy *models.ChannelEdgePolicy) error {
×
497

×
498
                switch {
×
499
                // Both policies are nil, nothing to compare.
500
                case expPolicy == nil && migPolicy == nil:
×
501
                        return nil
×
502

503
                // One of the policies is nil, but the other is not.
504
                case expPolicy == nil || migPolicy == nil:
×
505
                        return fmt.Errorf("expected both policies to be "+
×
506
                                "non-nil. Got expPolicy: %v, "+
×
507
                                "migPolicy: %v", expPolicy, migPolicy)
×
508

509
                // Both policies are non-nil, we can compare them.
510
                default:
×
511
                }
512

513
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
514
                        expPolicy.ExtraOpaqueData = nil
×
515
                }
×
516
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
517
                        migPolicy.ExtraOpaqueData = nil
×
518
                }
×
519

520
                return compare(*expPolicy, *migPolicy, "channel policy")
×
521
        }
522

523
        err = checkPolicy(policy1, migPol1)
×
524
        if err != nil {
×
525
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
526
                        err)
×
527
        }
×
528

529
        err = checkPolicy(policy2, migPol2)
×
530
        if err != nil {
×
531
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
532
                        err)
×
533
        }
×
534

535
        return nil
×
536
}
537

538
// migratePruneLog migrates the prune log from the KV backend to the SQL
539
// database. It iterates over each prune log entry in the KV store, inserts it
540
// into the SQL database, and then verifies that the entry was inserted
541
// correctly by fetching it back from the SQL database and comparing it to the
542
// original entry.
543
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
NEW
544
        sqlDB SQLQueries) error {
×
NEW
545

×
NEW
546
        var (
×
NEW
547
                count          uint64
×
NEW
548
                pruneTipHeight uint32
×
NEW
549
                pruneTipHash   chainhash.Hash
×
NEW
550
        )
×
NEW
551

×
NEW
552
        // migrateSinglePruneEntry is a helper function that inserts a single
×
NEW
553
        // prune log entry into the SQL database and verifies that it was
×
NEW
554
        // inserted correctly.
×
NEW
555
        migrateSinglePruneEntry := func(height uint32,
×
NEW
556
                hash *chainhash.Hash) error {
×
NEW
557

×
NEW
558
                count++
×
NEW
559

×
NEW
560
                // Keep track of the prune tip height and hash.
×
NEW
561
                if height > pruneTipHeight {
×
NEW
562
                        pruneTipHeight = height
×
NEW
563
                        pruneTipHash = *hash
×
NEW
564
                }
×
565

NEW
566
                err := sqlDB.UpsertPruneLogEntry(
×
NEW
567
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
NEW
568
                                BlockHeight: int64(height),
×
NEW
569
                                BlockHash:   hash[:],
×
NEW
570
                        },
×
NEW
571
                )
×
NEW
572
                if err != nil {
×
NEW
573
                        return fmt.Errorf("unable to insert prune log "+
×
NEW
574
                                "entry for height %d: %w", height, err)
×
NEW
575
                }
×
576

577
                // Now, check that the entry was inserted correctly.
NEW
578
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
NEW
579
                        ctx, int64(height),
×
NEW
580
                )
×
NEW
581
                if err != nil {
×
NEW
582
                        return fmt.Errorf("could not get prune hash "+
×
NEW
583
                                "for height %d: %w", height, err)
×
NEW
584
                }
×
585

NEW
586
                return compare(hash[:], migratedHash, "prune log entry")
×
587
        }
588

589
        // Iterate over each prune log entry in the KV store and migrate it to
590
        // the SQL database.
NEW
591
        err := forEachPruneLogEntry(kvBackend,
×
NEW
592
                func(height uint32, hash *chainhash.Hash) error {
×
NEW
593
                        err := migrateSinglePruneEntry(height, hash)
×
NEW
594
                        if err != nil {
×
NEW
595
                                return fmt.Errorf("could not migrate "+
×
NEW
596
                                        "prune log entry at height %d: %w",
×
NEW
597
                                        height, err)
×
NEW
598
                        }
×
599

NEW
600
                        return nil
×
601
                },
602
        )
NEW
603
        if err != nil {
×
NEW
604
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
605
        }
×
606

607
        // Check that the prune tip is set correctly in the SQL
608
        // database.
NEW
609
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
NEW
610
        if errors.Is(err, sql.ErrNoRows) {
×
NEW
611
                // The ErrGraphNeverPruned error is expected if no prune log
×
NEW
612
                // entries were migrated from the kvdb store. Otherwise, it's
×
NEW
613
                // an unexpected error.
×
NEW
614
                if count == 0 {
×
NEW
615
                        log.Infof("No prune log entries found in KV store " +
×
NEW
616
                                "to migrate")
×
NEW
617
                        return nil
×
NEW
618
                }
×
619
                // Fall-through to the next error check.
620
        }
NEW
621
        if err != nil {
×
NEW
622
                return fmt.Errorf("could not get prune tip: %w", err)
×
NEW
623
        }
×
624

NEW
625
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
NEW
626
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
NEW
627

×
NEW
628
                return fmt.Errorf("prune tip mismatch after migration: "+
×
NEW
629
                        "expected height %d, hash %s; got height %d, "+
×
NEW
630
                        "hash %s", pruneTipHeight, pruneTipHash,
×
NEW
631
                        pruneTip.BlockHeight,
×
NEW
632
                        chainhash.Hash(pruneTip.BlockHash))
×
NEW
633
        }
×
634

NEW
635
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
NEW
636
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
NEW
637
                pruneTipHash)
×
NEW
638

×
NEW
639
        return nil
×
640
}
641

642
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
643
// and policies from the given row returned by the SQL query
644
// GetChannelBySCIDWithPolicies.
645
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
646
        row sqlc.GetChannelBySCIDWithPoliciesRow,
647
        chain chainhash.Hash) (*models.ChannelEdgeInfo,
648
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
649

×
650
        node1, node2, err := buildNodeVertices(
×
651
                row.Node.PubKey, row.Node_2.PubKey,
×
652
        )
×
653
        if err != nil {
×
654
                return nil, nil, nil, err
×
655
        }
×
656

657
        edge, err := getAndBuildEdgeInfo(
×
658
                ctx, db, chain, row.Channel.ID, row.Channel, node1, node2,
×
659
        )
×
660
        if err != nil {
×
661
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
662
                        "info: %w", err)
×
663
        }
×
664

665
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
666
        if err != nil {
×
667
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
668
                        "policies: %w", err)
×
669
        }
×
670

671
        policy1, policy2, err := getAndBuildChanPolicies(
×
672
                ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
×
673
        )
×
674
        if err != nil {
×
675
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
676
                        "policies: %w", err)
×
677
        }
×
678

679
        return edge, policy1, policy2, nil
×
680
}
681

682
// forEachPruneLogEntry iterates over each prune log entry in the KV
683
// backend and calls the provided callback function for each entry.
684
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
NEW
685
        hash *chainhash.Hash) error) error {
×
NEW
686

×
NEW
687
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
688
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
NEW
689
                if metaBucket == nil {
×
NEW
690
                        return ErrGraphNotFound
×
NEW
691
                }
×
692

NEW
693
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
NEW
694
                if pruneBucket == nil {
×
NEW
695
                        // The graph has never been pruned and so, there are no
×
NEW
696
                        // entries to iterate over.
×
NEW
697
                        return nil
×
NEW
698
                }
×
699

NEW
700
                return pruneBucket.ForEach(func(k, v []byte) error {
×
NEW
701
                        blockHeight := byteOrder.Uint32(k)
×
NEW
702
                        var blockHash chainhash.Hash
×
NEW
703
                        copy(blockHash[:], v)
×
NEW
704

×
NEW
705
                        return cb(blockHeight, &blockHash)
×
NEW
706
                })
×
NEW
707
        }, func() {})
×
708
}
709

710
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
711
// the SQL database. It iterates over each closed SCID in the KV store, inserts
712
// it into the SQL database, and then verifies that the SCID was inserted
713
// correctly by checking if the channel with the given SCID is seen as closed in
714
// the SQL database.
715
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
716
        sqlDB SQLQueries) error {
×
NEW
717

×
NEW
718
        var count uint64
×
NEW
719
        err := forEachClosedSCID(kvBackend,
×
NEW
720
                func(scid lnwire.ShortChannelID) error {
×
NEW
721
                        count++
×
NEW
722

×
NEW
723
                        chanIDB := channelIDToBytes(scid.ToUint64())
×
NEW
724
                        err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
NEW
725
                        if err != nil {
×
NEW
726
                                return fmt.Errorf("could not insert closed "+
×
NEW
727
                                        "channel with SCID %s: %w", scid, err)
×
NEW
728
                        }
×
729

730
                        // Now, verify that the channel with the given SCID is
731
                        // seen as closed.
NEW
732
                        isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
733
                        if err != nil {
×
NEW
734
                                return fmt.Errorf("could not check if "+
×
NEW
735
                                        "channel %s is closed: %w", scid, err)
×
NEW
736
                        }
×
737

NEW
738
                        if !isClosed {
×
NEW
739
                                return fmt.Errorf("channel %s should be "+
×
NEW
740
                                        "closed, but is not", scid)
×
NEW
741
                        }
×
742

NEW
743
                        return nil
×
744
                },
745
        )
NEW
746
        if err != nil {
×
NEW
747
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
748
                        err)
×
NEW
749
        }
×
750

NEW
751
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
NEW
752

×
NEW
753
        return nil
×
754
}
755

756
// migrateZombieIndex migrates the zombie index from the KV backend to
757
// the SQL database. It iterates over each zombie channel in the KV store,
758
// inserts it into the SQL database, and then verifies that the channel is
759
// indeed marked as a zombie channel in the SQL database.
760
//
761
// NOTE: before inserting an entry into the zombie index, the function checks
762
// if the channel is already marked as closed in the SQL store. If it is,
763
// the entry is skipped. This means that the resulting zombie index count in
764
// the SQL store may well be less than the count of zombie channels in the KV
765
// store.
766
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
767
        sqlDB SQLQueries) error {
×
NEW
768

×
NEW
769
        var count uint64
×
NEW
770
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
NEW
771
                pubKey2 [33]byte) error {
×
NEW
772

×
NEW
773
                chanIDB := channelIDToBytes(chanID)
×
NEW
774

×
NEW
775
                // If it is in the closed SCID index, we don't need to
×
NEW
776
                // add it to the zombie index.
×
NEW
777
                //
×
NEW
778
                // NOTE: this means that the resulting zombie index count in
×
NEW
779
                // the SQL store may well be less than the count of zombie
×
NEW
780
                // channels in the KV store.
×
NEW
781
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
782
                if err != nil {
×
NEW
783
                        return fmt.Errorf("could not check closed "+
×
NEW
784
                                "channel: %w", err)
×
NEW
785
                }
×
NEW
786
                if isClosed {
×
NEW
787
                        return nil
×
NEW
788
                }
×
789

NEW
790
                count++
×
NEW
791

×
NEW
792
                err = sqlDB.UpsertZombieChannel(
×
NEW
793
                        ctx, sqlc.UpsertZombieChannelParams{
×
NEW
794
                                Version:  int16(ProtocolV1),
×
NEW
795
                                Scid:     chanIDB,
×
NEW
796
                                NodeKey1: pubKey1[:],
×
NEW
797
                                NodeKey2: pubKey2[:],
×
NEW
798
                        },
×
NEW
799
                )
×
NEW
800
                if err != nil {
×
NEW
801
                        return fmt.Errorf("could not upsert zombie "+
×
NEW
802
                                "channel %d: %w", chanID, err)
×
NEW
803
                }
×
804

805
                // Finally, verify that the channel is indeed marked as a
806
                // zombie channel.
NEW
807
                isZombie, err := sqlDB.IsZombieChannel(
×
NEW
808
                        ctx, sqlc.IsZombieChannelParams{
×
NEW
809
                                Version: int16(ProtocolV1),
×
NEW
810
                                Scid:    chanIDB,
×
NEW
811
                        },
×
NEW
812
                )
×
NEW
813
                if err != nil {
×
NEW
814
                        return fmt.Errorf("could not check if "+
×
NEW
815
                                "channel %d is zombie: %w", chanID, err)
×
NEW
816
                }
×
817

NEW
818
                if !isZombie {
×
NEW
819
                        return fmt.Errorf("channel %d should be "+
×
NEW
820
                                "a zombie, but is not", chanID)
×
NEW
821
                }
×
822

NEW
823
                return nil
×
824
        })
NEW
825
        if err != nil {
×
NEW
826
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
827
        }
×
828

NEW
829
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
NEW
830

×
NEW
831
        return nil
×
832
}
833

834
// forEachZombieEntry iterates over each zombie channel entry in the
835
// KV backend and calls the provided callback function for each entry.
836
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
NEW
837
        pubKey2 [33]byte) error) error {
×
NEW
838

×
NEW
839
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
840
                edges := tx.ReadBucket(edgeBucket)
×
NEW
841
                if edges == nil {
×
NEW
842
                        return ErrGraphNoEdgesFound
×
NEW
843
                }
×
NEW
844
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
NEW
845
                if zombieIndex == nil {
×
NEW
846
                        return nil
×
NEW
847
                }
×
848

NEW
849
                return zombieIndex.ForEach(func(k, v []byte) error {
×
NEW
850
                        var pubKey1, pubKey2 [33]byte
×
NEW
851
                        copy(pubKey1[:], v[:33])
×
NEW
852
                        copy(pubKey2[:], v[33:])
×
NEW
853

×
NEW
854
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
NEW
855
                })
×
NEW
856
        }, func() {})
×
857
}
858

859
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
860
// the provided callback function for each SCID.
861
func forEachClosedSCID(db kvdb.Backend,
NEW
862
        cb func(lnwire.ShortChannelID) error) error {
×
NEW
863

×
NEW
864
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
865
                closedScids := tx.ReadBucket(closedScidBucket)
×
NEW
866
                if closedScids == nil {
×
NEW
867
                        return nil
×
NEW
868
                }
×
869

NEW
870
                return closedScids.ForEach(func(k, _ []byte) error {
×
NEW
871
                        return cb(lnwire.NewShortChanIDFromInt(
×
NEW
872
                                byteOrder.Uint64(k),
×
NEW
873
                        ))
×
NEW
874
                })
×
NEW
875
        }, func() {})
×
876
}
877

878
// compare checks if the original and migrated objects are equal. If they
879
// are not, it returns an error with a unified diff of the two objects.
880
func compare(original, migrated any, identifier string) error {
×
881
        if reflect.DeepEqual(original, migrated) {
×
882
                return nil
×
883
        }
×
884

885
        diff := difflib.UnifiedDiff{
×
886
                A:        difflib.SplitLines(spew.Sdump(original)),
×
887
                B:        difflib.SplitLines(spew.Sdump(migrated)),
×
888
                FromFile: "Expected",
×
889
                FromDate: "",
×
890
                ToFile:   "Actual",
×
891
                ToDate:   "",
×
892
                Context:  3,
×
893
        }
×
894
        diffText, _ := difflib.GetUnifiedDiffString(diff)
×
895

×
896
        return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier,
×
897
                diffText)
×
898
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc