• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16305584530

15 Jul 2025 10:21PM UTC coverage: 67.321% (+0.01%) from 67.307%
16305584530

push

github

web-flow
Merge pull request #10080 from ellemouton/graphPrefixTables

sqldb+graph/db: prefix graph SQL objects with "graph_"

0 of 211 new or added lines in 3 files covered. (0.0%)

58 existing lines in 19 files now uncovered.

135405 of 201132 relevant lines covered (67.32%)

21775.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "cmp"
6
        "context"
7
        "database/sql"
8
        "errors"
9
        "fmt"
10
        "net"
11
        "slices"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb"
19
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
20
)
21

22
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
23
// backend.
24
//
25
// NOTE: this is currently not called from any code path. It is called via tests
26
// only for now and will be called from the main lnd binary once the
27
// migration is fully implemented and tested.
28
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
29
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
30

×
31
        log.Infof("Starting migration of the graph store from KV to SQL")
×
32
        t0 := time.Now()
×
33

×
34
        // Check if there is a graph to migrate.
×
35
        graphExists, err := checkGraphExists(kvBackend)
×
36
        if err != nil {
×
37
                return fmt.Errorf("failed to check graph existence: %w", err)
×
38
        }
×
39
        if !graphExists {
×
40
                log.Infof("No graph found in KV store, skipping the migration")
×
41
                return nil
×
42
        }
×
43

44
        // 1) Migrate all the nodes.
45
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
46
                return fmt.Errorf("could not migrate nodes: %w", err)
×
47
        }
×
48

49
        // 2) Migrate the source node.
50
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
51
                return fmt.Errorf("could not migrate source node: %w", err)
×
52
        }
×
53

54
        // 3) Migrate all the channels and channel policies.
55
        err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
×
56
        if err != nil {
×
57
                return fmt.Errorf("could not migrate channels and policies: %w",
×
58
                        err)
×
59
        }
×
60

61
        // 4) Migrate the Prune log.
62
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
63
                return fmt.Errorf("could not migrate prune log: %w", err)
×
64
        }
×
65

66
        // 5) Migrate the closed SCID index.
67
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
68
        if err != nil {
×
69
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
70
                        err)
×
71
        }
×
72

73
        // 6) Migrate the zombie index.
74
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
75
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
76
        }
×
77

78
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
79
                time.Since(t0))
×
80

×
81
        return nil
×
82
}
83

84
// checkGraphExists checks if the graph exists in the KV backend.
85
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
86
        // Check if there is even a graph to migrate.
×
87
        err := db.View(func(tx kvdb.RTx) error {
×
88
                // Check for the existence of the node bucket which is a top
×
89
                // level bucket that would have been created on the initial
×
90
                // creation of the graph store.
×
91
                nodes := tx.ReadBucket(nodeBucket)
×
92
                if nodes == nil {
×
93
                        return ErrGraphNotFound
×
94
                }
×
95

96
                return nil
×
97
        }, func() {})
×
98
        if errors.Is(err, ErrGraphNotFound) {
×
99
                return false, nil
×
100
        } else if err != nil {
×
101
                return false, err
×
102
        }
×
103

104
        return true, nil
×
105
}
106

107
// migrateNodes migrates all nodes from the KV backend to the SQL database.
108
// This includes doing a sanity check after each migration to ensure that the
109
// migrated node matches the original node.
110
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
111
        sqlDB SQLQueries) error {
×
112

×
113
        // Keep track of the number of nodes migrated and the number of
×
114
        // nodes skipped due to errors.
×
115
        var (
×
116
                count   uint64
×
117
                skipped uint64
×
118
        )
×
119

×
120
        // Loop through each node in the KV store and insert it into the SQL
×
121
        // database.
×
122
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
123
                node *models.LightningNode) error {
×
124

×
125
                pub := node.PubKeyBytes
×
126

×
127
                // Sanity check to ensure that the node has valid extra opaque
×
128
                // data. If it does not, we'll skip it. We need to do this
×
129
                // because previously we would just persist any TLV bytes that
×
130
                // we received without validating them. Now, however, we
×
131
                // normalise the storage of extra opaque data, so we need to
×
132
                // ensure that the data is valid. We don't want to abort the
×
133
                // migration if we encounter a node with invalid extra opaque
×
134
                // data, so we'll just skip it and log a warning.
×
135
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
136
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
137
                        skipped++
×
138
                        log.Warnf("Skipping migration of node %x with invalid "+
×
139
                                "extra opaque data: %v", pub,
×
140
                                node.ExtraOpaqueData)
×
141

×
142
                        return nil
×
143
                } else if err != nil {
×
144
                        return fmt.Errorf("unable to marshal extra "+
×
145
                                "opaque data for node %x: %w", pub, err)
×
146
                }
×
147

148
                count++
×
149

×
150
                // TODO(elle): At this point, we should check the loaded node
×
151
                // to see if we should extract any DNS addresses from its
×
152
                // opaque type addresses. This is expected to be done in:
×
153
                // https://github.com/lightningnetwork/lnd/pull/9455.
×
154
                // This TODO is being tracked in
×
155
                //  https://github.com/lightningnetwork/lnd/issues/9795 as this
×
156
                // must be addressed before making this code path active in
×
157
                // production.
×
158

×
159
                // Write the node to the SQL database.
×
160
                id, err := upsertNode(ctx, sqlDB, node)
×
161
                if err != nil {
×
162
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
163
                                err)
×
164
                }
×
165

166
                // Fetch it from the SQL store and compare it against the
167
                // original node object to ensure the migration was successful.
168
                dbNode, err := sqlDB.GetNodeByPubKey(
×
169
                        ctx, sqlc.GetNodeByPubKeyParams{
×
170
                                PubKey:  node.PubKeyBytes[:],
×
171
                                Version: int16(ProtocolV1),
×
172
                        },
×
173
                )
×
174
                if err != nil {
×
175
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
176
                                "after migration: %w", pub, err)
×
177
                }
×
178

179
                // Sanity check: ensure the migrated node ID matches the one we
180
                // just inserted.
181
                if dbNode.ID != id {
×
182
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
183
                                "after migration: expected %d, got %d",
×
184
                                pub, id, dbNode.ID)
×
185
                }
×
186

187
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
188
                if err != nil {
×
189
                        return fmt.Errorf("could not build migrated node "+
×
190
                                "from dbNode(db id: %d, node pub: %x): %w",
×
191
                                dbNode.ID, pub, err)
×
192
                }
×
193

194
                // Make sure that the node addresses are sorted before
195
                // comparing them to ensure that the order of addresses does
196
                // not affect the comparison.
197
                slices.SortFunc(node.Addresses, func(i, j net.Addr) int {
×
198
                        return cmp.Compare(i.String(), j.String())
×
199
                })
×
200
                slices.SortFunc(
×
201
                        migratedNode.Addresses, func(i, j net.Addr) int {
×
202
                                return cmp.Compare(i.String(), j.String())
×
203
                        },
×
204
                )
205

206
                return sqldb.CompareRecords(
×
207
                        node, migratedNode, fmt.Sprintf("node %x", pub),
×
208
                )
×
209
        }, func() {
×
210
                // No reset is needed since if a retry occurs, the entire
×
211
                // migration will be retried from the start.
×
212
        })
×
213
        if err != nil {
×
214
                return fmt.Errorf("could not migrate nodes: %w", err)
×
215
        }
×
216

217
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
218
                "invalid TLV streams)", count, skipped)
×
219

×
220
        return nil
×
221
}
222

223
// migrateSourceNode migrates the source node from the KV backend to the
224
// SQL database.
225
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
226
        sqlDB SQLQueries) error {
×
227

×
228
        sourceNode, err := sourceNode(kvdb)
×
229
        if errors.Is(err, ErrSourceNodeNotSet) {
×
230
                // If the source node has not been set yet, we can skip this
×
231
                // migration step.
×
232
                return nil
×
233
        } else if err != nil {
×
234
                return fmt.Errorf("could not get source node from kv "+
×
235
                        "store: %w", err)
×
236
        }
×
237

238
        pub := sourceNode.PubKeyBytes
×
239

×
240
        // Get the DB ID of the source node by its public key. This node must
×
241
        // already exist in the SQL database, as it should have been migrated
×
242
        // in the previous node-migration step.
×
243
        id, err := sqlDB.GetNodeIDByPubKey(
×
244
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
245
                        PubKey:  pub[:],
×
246
                        Version: int16(ProtocolV1),
×
247
                },
×
248
        )
×
249
        if err != nil {
×
250
                return fmt.Errorf("could not get source node ID: %w", err)
×
251
        }
×
252

253
        // Now we can add the source node to the SQL database.
254
        err = sqlDB.AddSourceNode(ctx, id)
×
255
        if err != nil {
×
256
                return fmt.Errorf("could not add source node to SQL store: %w",
×
257
                        err)
×
258
        }
×
259

260
        // Verify that the source node was added correctly by fetching it back
261
        // from the SQL database and checking that the expected DB ID and
262
        // pub key are returned. We don't need to do a whole node comparison
263
        // here, as this was already done in the previous migration step.
264
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
265
        if err != nil {
×
266
                return fmt.Errorf("could not get source nodes from SQL "+
×
267
                        "store: %w", err)
×
268
        }
×
269

270
        // The SQL store has support for multiple source nodes (for future
271
        // protocol versions) but this migration is purely aimed at the V1
272
        // store, and so we expect exactly one source node to be present.
273
        if len(srcNodes) != 1 {
×
274
                return fmt.Errorf("expected exactly one source node, "+
×
275
                        "got %d", len(srcNodes))
×
276
        }
×
277

278
        // Check that the source node ID and pub key match the original
279
        // source node.
280
        if srcNodes[0].NodeID != id {
×
281
                return fmt.Errorf("source node ID mismatch after migration: "+
×
282
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
283
        }
×
284
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
285
        if err != nil {
×
286
                return fmt.Errorf("source node pubkey mismatch after "+
×
287
                        "migration: %w", err)
×
288
        }
×
289

290
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
291

×
292
        return nil
×
293
}
294

295
// migrateChannelsAndPolicies migrates all channels and their policies
296
// from the KV backend to the SQL database.
297
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
298
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
299

×
300
        var (
×
301
                channelCount       uint64
×
302
                skippedChanCount   uint64
×
303
                policyCount        uint64
×
304
                skippedPolicyCount uint64
×
305
        )
×
306
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
307
                // If the policy is nil, we can skip it.
×
308
                if policy == nil {
×
309
                        return nil
×
310
                }
×
311

312
                // Unlike the special case of invalid TLV bytes for node and
313
                // channel announcements, we don't need to handle the case for
314
                // channel policies here because it is already handled in the
315
                // `forEachChannel` function. If the policy has invalid TLV
316
                // bytes, then `nil` will be passed to this function.
317

318
                policyCount++
×
319

×
320
                _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
×
321
                if err != nil {
×
322
                        return fmt.Errorf("could not migrate channel "+
×
323
                                "policy %d: %w", policy.ChannelID, err)
×
324
                }
×
325

326
                return nil
×
327
        }
328

329
        // Iterate over each channel in the KV store and migrate it and its
330
        // policies to the SQL database.
331
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
332
                policy1 *models.ChannelEdgePolicy,
×
333
                policy2 *models.ChannelEdgePolicy) error {
×
334

×
335
                scid := channel.ChannelID
×
336

×
337
                // Here, we do a sanity check to ensure that the chain hash of
×
338
                // the channel returned by the KV store matches the expected
×
339
                // chain hash. This is important since in the SQL store, we will
×
340
                // no longer explicitly store the chain hash in the channel
×
341
                // info, but rather rely on the chain hash LND is running with.
×
342
                // So this is our way of ensuring that LND is running on the
×
343
                // correct network at migration time.
×
344
                if channel.ChainHash != chain {
×
345
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
346
                                "expected %s", scid, channel.ChainHash, chain)
×
347
                }
×
348

349
                // Sanity check to ensure that the channel has valid extra
350
                // opaque data. If it does not, we'll skip it. We need to do
351
                // this because previously we would just persist any TLV bytes
352
                // that we received without validating them. Now, however, we
353
                // normalise the storage of extra opaque data, so we need to
354
                // ensure that the data is valid. We don't want to abort the
355
                // migration if we encounter a channel with invalid extra opaque
356
                // data, so we'll just skip it and log a warning.
357
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
358
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
359
                        log.Warnf("Skipping channel %d with invalid "+
×
360
                                "extra opaque data: %v", scid,
×
361
                                channel.ExtraOpaqueData)
×
362

×
363
                        skippedChanCount++
×
364

×
365
                        // If we skip a channel, we also skip its policies.
×
366
                        if policy1 != nil {
×
367
                                skippedPolicyCount++
×
368
                        }
×
369
                        if policy2 != nil {
×
370
                                skippedPolicyCount++
×
371
                        }
×
372

373
                        return nil
×
374
                } else if err != nil {
×
375
                        return fmt.Errorf("unable to marshal extra opaque "+
×
376
                                "data for channel %d (%v): %w", scid,
×
377
                                channel.ExtraOpaqueData, err)
×
378
                }
×
379

380
                channelCount++
×
381
                err = migrateSingleChannel(
×
382
                        ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
×
383
                )
×
384
                if err != nil {
×
385
                        return fmt.Errorf("could not migrate channel %d: %w",
×
386
                                scid, err)
×
387
                }
×
388

389
                return nil
×
390
        }, func() {
×
391
                // No reset is needed since if a retry occurs, the entire
×
392
                // migration will be retried from the start.
×
393
        })
×
394
        if err != nil {
×
395
                return fmt.Errorf("could not migrate channels and policies: %w",
×
396
                        err)
×
397
        }
×
398

399
        log.Infof("Migrated %d channels and %d policies from KV to SQL "+
×
400
                "(skipped %d channels and %d policies due to invalid TLV "+
×
401
                "streams)", channelCount, policyCount, skippedChanCount,
×
402
                skippedPolicyCount)
×
403

×
404
        return nil
×
405
}
406

407
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
408
        channel *models.ChannelEdgeInfo,
409
        policy1, policy2 *models.ChannelEdgePolicy,
410
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
411

×
412
        scid := channel.ChannelID
×
413

×
414
        // First, migrate the channel info along with its policies.
×
415
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
416
        if err != nil {
×
417
                return fmt.Errorf("could not insert record for channel %d "+
×
418
                        "in SQL store: %w", scid, err)
×
419
        }
×
420

421
        // Now, migrate the two channel policies.
422
        err = migChanPolicy(policy1)
×
423
        if err != nil {
×
424
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
425
                        err)
×
426
        }
×
427
        err = migChanPolicy(policy2)
×
428
        if err != nil {
×
429
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
430
                        err)
×
431
        }
×
432

433
        // Now, fetch the channel and its policies from the SQL DB.
434
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
435
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
436
                        Scid:    channelIDToBytes(scid),
×
437
                        Version: int16(ProtocolV1),
×
438
                },
×
439
        )
×
440
        if err != nil {
×
441
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
442
                        err)
×
443
        }
×
444

445
        // Assert that the DB IDs for the channel and nodes are as expected
446
        // given the inserted channel info.
447
        err = sqldb.CompareRecords(
×
NEW
448
                dbChanInfo.channelID, row.GraphChannel.ID, "channel DB ID",
×
449
        )
×
450
        if err != nil {
×
451
                return err
×
452
        }
×
453
        err = sqldb.CompareRecords(
×
NEW
454
                dbChanInfo.node1ID, row.GraphNode.ID, "node1 DB ID",
×
455
        )
×
456
        if err != nil {
×
457
                return err
×
458
        }
×
459
        err = sqldb.CompareRecords(
×
NEW
460
                dbChanInfo.node2ID, row.GraphNode_2.ID, "node2 DB ID",
×
461
        )
×
462
        if err != nil {
×
463
                return err
×
464
        }
×
465

466
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
467
                ctx, sqlDB, row, channel.ChainHash,
×
468
        )
×
469
        if err != nil {
×
470
                return fmt.Errorf("could not build migrated channel and "+
×
471
                        "policies: %w", err)
×
472
        }
×
473

474
        // Finally, compare the original channel info and
475
        // policies with the migrated ones to ensure they match.
476
        if len(channel.ExtraOpaqueData) == 0 {
×
477
                channel.ExtraOpaqueData = nil
×
478
        }
×
479
        if len(migChan.ExtraOpaqueData) == 0 {
×
480
                migChan.ExtraOpaqueData = nil
×
481
        }
×
482

483
        err = sqldb.CompareRecords(
×
484
                channel, migChan, fmt.Sprintf("channel %d", scid),
×
485
        )
×
486
        if err != nil {
×
487
                return err
×
488
        }
×
489

490
        checkPolicy := func(expPolicy,
×
491
                migPolicy *models.ChannelEdgePolicy) error {
×
492

×
493
                switch {
×
494
                // Both policies are nil, nothing to compare.
495
                case expPolicy == nil && migPolicy == nil:
×
496
                        return nil
×
497

498
                // One of the policies is nil, but the other is not.
499
                case expPolicy == nil || migPolicy == nil:
×
500
                        return fmt.Errorf("expected both policies to be "+
×
501
                                "non-nil. Got expPolicy: %v, "+
×
502
                                "migPolicy: %v", expPolicy, migPolicy)
×
503

504
                // Both policies are non-nil, we can compare them.
505
                default:
×
506
                }
507

508
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
509
                        expPolicy.ExtraOpaqueData = nil
×
510
                }
×
511
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
512
                        migPolicy.ExtraOpaqueData = nil
×
513
                }
×
514

515
                return sqldb.CompareRecords(
×
516
                        *expPolicy, *migPolicy, "channel policy",
×
517
                )
×
518
        }
519

520
        err = checkPolicy(policy1, migPol1)
×
521
        if err != nil {
×
522
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
523
                        err)
×
524
        }
×
525

526
        err = checkPolicy(policy2, migPol2)
×
527
        if err != nil {
×
528
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
529
                        err)
×
530
        }
×
531

532
        return nil
×
533
}
534

535
// migratePruneLog migrates the prune log from the KV backend to the SQL
536
// database. It iterates over each prune log entry in the KV store, inserts it
537
// into the SQL database, and then verifies that the entry was inserted
538
// correctly by fetching it back from the SQL database and comparing it to the
539
// original entry.
540
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
541
        sqlDB SQLQueries) error {
×
542

×
543
        var (
×
544
                count          uint64
×
545
                pruneTipHeight uint32
×
546
                pruneTipHash   chainhash.Hash
×
547
        )
×
548

×
549
        // migrateSinglePruneEntry is a helper function that inserts a single
×
550
        // prune log entry into the SQL database and verifies that it was
×
551
        // inserted correctly.
×
552
        migrateSinglePruneEntry := func(height uint32,
×
553
                hash *chainhash.Hash) error {
×
554

×
555
                count++
×
556

×
557
                // Keep track of the prune tip height and hash.
×
558
                if height > pruneTipHeight {
×
559
                        pruneTipHeight = height
×
560
                        pruneTipHash = *hash
×
561
                }
×
562

563
                err := sqlDB.UpsertPruneLogEntry(
×
564
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
565
                                BlockHeight: int64(height),
×
566
                                BlockHash:   hash[:],
×
567
                        },
×
568
                )
×
569
                if err != nil {
×
570
                        return fmt.Errorf("unable to insert prune log "+
×
571
                                "entry for height %d: %w", height, err)
×
572
                }
×
573

574
                // Now, check that the entry was inserted correctly.
575
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
576
                        ctx, int64(height),
×
577
                )
×
578
                if err != nil {
×
579
                        return fmt.Errorf("could not get prune hash "+
×
580
                                "for height %d: %w", height, err)
×
581
                }
×
582

583
                return sqldb.CompareRecords(
×
584
                        hash[:], migratedHash, "prune log entry",
×
585
                )
×
586
        }
587

588
        // Iterate over each prune log entry in the KV store and migrate it to
589
        // the SQL database.
590
        err := forEachPruneLogEntry(
×
591
                kvBackend, func(height uint32, hash *chainhash.Hash) error {
×
592
                        err := migrateSinglePruneEntry(height, hash)
×
593
                        if err != nil {
×
594
                                return fmt.Errorf("could not migrate "+
×
595
                                        "prune log entry at height %d: %w",
×
596
                                        height, err)
×
597
                        }
×
598

599
                        return nil
×
600
                },
601
        )
602
        if err != nil {
×
603
                return fmt.Errorf("could not migrate prune log: %w", err)
×
604
        }
×
605

606
        // Check that the prune tip is set correctly in the SQL
607
        // database.
608
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
609
        if errors.Is(err, sql.ErrNoRows) {
×
610
                // The ErrGraphNeverPruned error is expected if no prune log
×
611
                // entries were migrated from the kvdb store. Otherwise, it's
×
612
                // an unexpected error.
×
613
                if count == 0 {
×
614
                        log.Infof("No prune log entries found in KV store " +
×
615
                                "to migrate")
×
616
                        return nil
×
617
                }
×
618
                // Fall-through to the next error check.
619
        }
620
        if err != nil {
×
621
                return fmt.Errorf("could not get prune tip: %w", err)
×
622
        }
×
623

624
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
625
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
626

×
627
                return fmt.Errorf("prune tip mismatch after migration: "+
×
628
                        "expected height %d, hash %s; got height %d, "+
×
629
                        "hash %s", pruneTipHeight, pruneTipHash,
×
630
                        pruneTip.BlockHeight,
×
631
                        chainhash.Hash(pruneTip.BlockHash))
×
632
        }
×
633

634
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
635
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
636
                pruneTipHash)
×
637

×
638
        return nil
×
639
}
640

641
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
642
// and policies from the given row returned by the SQL query
643
// GetChannelBySCIDWithPolicies.
644
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
645
        row sqlc.GetChannelBySCIDWithPoliciesRow,
646
        chain chainhash.Hash) (*models.ChannelEdgeInfo,
647
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
648

×
649
        node1, node2, err := buildNodeVertices(
×
NEW
650
                row.GraphNode.PubKey, row.GraphNode_2.PubKey,
×
651
        )
×
652
        if err != nil {
×
653
                return nil, nil, nil, err
×
654
        }
×
655

656
        edge, err := getAndBuildEdgeInfo(
×
NEW
657
                ctx, db, chain, row.GraphChannel.ID, row.GraphChannel, node1,
×
NEW
658
                node2,
×
659
        )
×
660
        if err != nil {
×
661
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
662
                        "info: %w", err)
×
663
        }
×
664

665
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
666
        if err != nil {
×
667
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
668
                        "policies: %w", err)
×
669
        }
×
670

671
        policy1, policy2, err := getAndBuildChanPolicies(
×
672
                ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
×
673
        )
×
674
        if err != nil {
×
675
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
676
                        "policies: %w", err)
×
677
        }
×
678

679
        return edge, policy1, policy2, nil
×
680
}
681

682
// forEachPruneLogEntry iterates over each prune log entry in the KV
683
// backend and calls the provided callback function for each entry.
684
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
685
        hash *chainhash.Hash) error) error {
×
686

×
687
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
688
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
689
                if metaBucket == nil {
×
690
                        return ErrGraphNotFound
×
691
                }
×
692

693
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
694
                if pruneBucket == nil {
×
695
                        // The graph has never been pruned and so, there are no
×
696
                        // entries to iterate over.
×
697
                        return nil
×
698
                }
×
699

700
                return pruneBucket.ForEach(func(k, v []byte) error {
×
701
                        blockHeight := byteOrder.Uint32(k)
×
702
                        var blockHash chainhash.Hash
×
703
                        copy(blockHash[:], v)
×
704

×
705
                        return cb(blockHeight, &blockHash)
×
706
                })
×
707
        }, func() {})
×
708
}
709

710
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
711
// the SQL database. It iterates over each closed SCID in the KV store, inserts
712
// it into the SQL database, and then verifies that the SCID was inserted
713
// correctly by checking if the channel with the given SCID is seen as closed in
714
// the SQL database.
715
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
716
        sqlDB SQLQueries) error {
×
717

×
718
        var count uint64
×
719
        migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
×
720
                count++
×
721

×
722
                chanIDB := channelIDToBytes(scid.ToUint64())
×
723
                err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
724
                if err != nil {
×
725
                        return fmt.Errorf("could not insert closed channel "+
×
726
                                "with SCID %s: %w", scid, err)
×
727
                }
×
728

729
                // Now, verify that the channel with the given SCID is
730
                // seen as closed.
731
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
732
                if err != nil {
×
733
                        return fmt.Errorf("could not check if channel %s "+
×
734
                                "is closed: %w", scid, err)
×
735
                }
×
736

737
                if !isClosed {
×
738
                        return fmt.Errorf("channel %s should be closed, "+
×
739
                                "but is not", scid)
×
740
                }
×
741

742
                return nil
×
743
        }
744

745
        err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID)
×
746
        if err != nil {
×
747
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
748
                        err)
×
749
        }
×
750

751
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
752

×
753
        return nil
×
754
}
755

756
// migrateZombieIndex migrates the zombie index from the KV backend to
757
// the SQL database. It iterates over each zombie channel in the KV store,
758
// inserts it into the SQL database, and then verifies that the channel is
759
// indeed marked as a zombie channel in the SQL database.
760
//
761
// NOTE: before inserting an entry into the zombie index, the function checks
762
// if the channel is already marked as closed in the SQL store. If it is,
763
// the entry is skipped. This means that the resulting zombie index count in
764
// the SQL store may well be less than the count of zombie channels in the KV
765
// store.
766
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
767
        sqlDB SQLQueries) error {
×
768

×
769
        var count uint64
×
770
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
771
                pubKey2 [33]byte) error {
×
772

×
773
                chanIDB := channelIDToBytes(chanID)
×
774

×
775
                // If it is in the closed SCID index, we don't need to
×
776
                // add it to the zombie index.
×
777
                //
×
778
                // NOTE: this means that the resulting zombie index count in
×
779
                // the SQL store may well be less than the count of zombie
×
780
                // channels in the KV store.
×
781
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
782
                if err != nil {
×
783
                        return fmt.Errorf("could not check closed "+
×
784
                                "channel: %w", err)
×
785
                }
×
786
                if isClosed {
×
787
                        return nil
×
788
                }
×
789

790
                count++
×
791

×
792
                err = sqlDB.UpsertZombieChannel(
×
793
                        ctx, sqlc.UpsertZombieChannelParams{
×
794
                                Version:  int16(ProtocolV1),
×
795
                                Scid:     chanIDB,
×
796
                                NodeKey1: pubKey1[:],
×
797
                                NodeKey2: pubKey2[:],
×
798
                        },
×
799
                )
×
800
                if err != nil {
×
801
                        return fmt.Errorf("could not upsert zombie "+
×
802
                                "channel %d: %w", chanID, err)
×
803
                }
×
804

805
                // Finally, verify that the channel is indeed marked as a
806
                // zombie channel.
807
                isZombie, err := sqlDB.IsZombieChannel(
×
808
                        ctx, sqlc.IsZombieChannelParams{
×
809
                                Version: int16(ProtocolV1),
×
810
                                Scid:    chanIDB,
×
811
                        },
×
812
                )
×
813
                if err != nil {
×
814
                        return fmt.Errorf("could not check if "+
×
815
                                "channel %d is zombie: %w", chanID, err)
×
816
                }
×
817

818
                if !isZombie {
×
819
                        return fmt.Errorf("channel %d should be "+
×
820
                                "a zombie, but is not", chanID)
×
821
                }
×
822

823
                return nil
×
824
        })
825
        if err != nil {
×
826
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
827
        }
×
828

829
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
830

×
831
        return nil
×
832
}
833

834
// forEachZombieEntry iterates over each zombie channel entry in the
835
// KV backend and calls the provided callback function for each entry.
836
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
837
        pubKey2 [33]byte) error) error {
×
838

×
839
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
840
                edges := tx.ReadBucket(edgeBucket)
×
841
                if edges == nil {
×
842
                        return ErrGraphNoEdgesFound
×
843
                }
×
844
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
845
                if zombieIndex == nil {
×
846
                        return nil
×
847
                }
×
848

849
                return zombieIndex.ForEach(func(k, v []byte) error {
×
850
                        var pubKey1, pubKey2 [33]byte
×
851
                        copy(pubKey1[:], v[:33])
×
852
                        copy(pubKey2[:], v[33:])
×
853

×
854
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
855
                })
×
856
        }, func() {})
×
857
}
858

859
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
860
// the provided callback function for each SCID.
861
func forEachClosedSCID(db kvdb.Backend,
862
        cb func(lnwire.ShortChannelID) error) error {
×
863

×
864
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
865
                closedScids := tx.ReadBucket(closedScidBucket)
×
866
                if closedScids == nil {
×
867
                        return nil
×
868
                }
×
869

870
                return closedScids.ForEach(func(k, _ []byte) error {
×
871
                        return cb(lnwire.NewShortChanIDFromInt(
×
872
                                byteOrder.Uint64(k),
×
873
                        ))
×
874
                })
×
875
        }, func() {})
×
876
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc