• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16930624683

13 Aug 2025 07:31AM UTC coverage: 66.908% (+10.0%) from 56.955%
16930624683

Pull #10148

github

web-flow
Merge faa71c073 into 8810793e6
Pull Request #10148: graph/db+sqldb: different defaults for SQLite and Postgres query options

11 of 80 new or added lines in 7 files covered. (13.75%)

62 existing lines in 9 files now uncovered.

135807 of 202975 relevant lines covered (66.91%)

21557.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "cmp"
6
        "context"
7
        "database/sql"
8
        "errors"
9
        "fmt"
10
        "net"
11
        "slices"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb"
19
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
20
        "golang.org/x/time/rate"
21
)
22

23
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
24
// backend.
25
//
26
// NOTE: this is currently not called from any code path. It is called via tests
27
// only for now and will be called from the main lnd binary once the
28
// migration is fully implemented and tested.
29
func MigrateGraphToSQL(ctx context.Context, cfg *SQLStoreConfig,
NEW
30
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
31

×
32
        log.Infof("Starting migration of the graph store from KV to SQL")
×
33
        t0 := time.Now()
×
34

×
35
        // Check if there is a graph to migrate.
×
36
        graphExists, err := checkGraphExists(kvBackend)
×
37
        if err != nil {
×
38
                return fmt.Errorf("failed to check graph existence: %w", err)
×
39
        }
×
40
        if !graphExists {
×
41
                log.Infof("No graph found in KV store, skipping the migration")
×
42
                return nil
×
43
        }
×
44

45
        // 1) Migrate all the nodes.
NEW
46
        if err := migrateNodes(ctx, cfg.QueryCfg, kvBackend, sqlDB); err != nil {
×
47
                return fmt.Errorf("could not migrate nodes: %w", err)
×
48
        }
×
49

50
        // 2) Migrate the source node.
51
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
52
                return fmt.Errorf("could not migrate source node: %w", err)
×
53
        }
×
54

55
        // 3) Migrate all the channels and channel policies.
NEW
56
        err = migrateChannelsAndPolicies(ctx, cfg, kvBackend, sqlDB)
×
57
        if err != nil {
×
58
                return fmt.Errorf("could not migrate channels and policies: %w",
×
59
                        err)
×
60
        }
×
61

62
        // 4) Migrate the Prune log.
63
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
64
                return fmt.Errorf("could not migrate prune log: %w", err)
×
65
        }
×
66

67
        // 5) Migrate the closed SCID index.
68
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
69
        if err != nil {
×
70
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
71
                        err)
×
72
        }
×
73

74
        // 6) Migrate the zombie index.
75
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
76
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
77
        }
×
78

79
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
80
                time.Since(t0))
×
81

×
82
        return nil
×
83
}
84

85
// checkGraphExists checks if the graph exists in the KV backend.
86
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
87
        // Check if there is even a graph to migrate.
×
88
        err := db.View(func(tx kvdb.RTx) error {
×
89
                // Check for the existence of the node bucket which is a top
×
90
                // level bucket that would have been created on the initial
×
91
                // creation of the graph store.
×
92
                nodes := tx.ReadBucket(nodeBucket)
×
93
                if nodes == nil {
×
94
                        return ErrGraphNotFound
×
95
                }
×
96

97
                return nil
×
98
        }, func() {})
×
99
        if errors.Is(err, ErrGraphNotFound) {
×
100
                return false, nil
×
101
        } else if err != nil {
×
102
                return false, err
×
103
        }
×
104

105
        return true, nil
×
106
}
107

108
// migrateNodes migrates all nodes from the KV backend to the SQL database.
109
// This includes doing a sanity check after each migration to ensure that the
110
// migrated node matches the original node.
111
func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig,
NEW
112
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
113

×
114
        // Keep track of the number of nodes migrated and the number of
×
115
        // nodes skipped due to errors.
×
116
        var (
×
117
                count   uint64
×
118
                skipped uint64
×
119

×
120
                t0    = time.Now()
×
121
                chunk uint64
×
122
                s     = rate.Sometimes{
×
123
                        Interval: 10 * time.Second,
×
124
                }
×
125
        )
×
126

×
127
        // Loop through each node in the KV store and insert it into the SQL
×
128
        // database.
×
129
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
130
                node *models.LightningNode) error {
×
131

×
132
                pub := node.PubKeyBytes
×
133

×
134
                // Sanity check to ensure that the node has valid extra opaque
×
135
                // data. If it does not, we'll skip it. We need to do this
×
136
                // because previously we would just persist any TLV bytes that
×
137
                // we received without validating them. Now, however, we
×
138
                // normalise the storage of extra opaque data, so we need to
×
139
                // ensure that the data is valid. We don't want to abort the
×
140
                // migration if we encounter a node with invalid extra opaque
×
141
                // data, so we'll just skip it and log a warning.
×
142
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
143
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
144
                        skipped++
×
145
                        log.Warnf("Skipping migration of node %x with invalid "+
×
146
                                "extra opaque data: %v", pub,
×
147
                                node.ExtraOpaqueData)
×
148

×
149
                        return nil
×
150
                } else if err != nil {
×
151
                        return fmt.Errorf("unable to marshal extra "+
×
152
                                "opaque data for node %x: %w", pub, err)
×
153
                }
×
154

155
                count++
×
156
                chunk++
×
157

×
158
                // TODO(elle): At this point, we should check the loaded node
×
159
                // to see if we should extract any DNS addresses from its
×
160
                // opaque type addresses. This is expected to be done in:
×
161
                // https://github.com/lightningnetwork/lnd/pull/9455.
×
162
                // This TODO is being tracked in
×
163
                //  https://github.com/lightningnetwork/lnd/issues/9795 as this
×
164
                // must be addressed before making this code path active in
×
165
                // production.
×
166

×
167
                // Write the node to the SQL database.
×
168
                id, err := upsertNode(ctx, sqlDB, node)
×
169
                if err != nil {
×
170
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
171
                                err)
×
172
                }
×
173

174
                // Fetch it from the SQL store and compare it against the
175
                // original node object to ensure the migration was successful.
176
                dbNode, err := sqlDB.GetNodeByPubKey(
×
177
                        ctx, sqlc.GetNodeByPubKeyParams{
×
178
                                PubKey:  node.PubKeyBytes[:],
×
179
                                Version: int16(ProtocolV1),
×
180
                        },
×
181
                )
×
182
                if err != nil {
×
183
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
184
                                "after migration: %w", pub, err)
×
185
                }
×
186

187
                // Sanity check: ensure the migrated node ID matches the one we
188
                // just inserted.
189
                if dbNode.ID != id {
×
190
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
191
                                "after migration: expected %d, got %d",
×
192
                                pub, id, dbNode.ID)
×
193
                }
×
194

NEW
195
                migratedNode, err := buildNode(ctx, cfg, sqlDB, dbNode)
×
196
                if err != nil {
×
197
                        return fmt.Errorf("could not build migrated node "+
×
198
                                "from dbNode(db id: %d, node pub: %x): %w",
×
199
                                dbNode.ID, pub, err)
×
200
                }
×
201

202
                // Make sure that the node addresses are sorted before
203
                // comparing them to ensure that the order of addresses does
204
                // not affect the comparison.
205
                slices.SortFunc(node.Addresses, func(i, j net.Addr) int {
×
206
                        return cmp.Compare(i.String(), j.String())
×
207
                })
×
208
                slices.SortFunc(
×
209
                        migratedNode.Addresses, func(i, j net.Addr) int {
×
210
                                return cmp.Compare(i.String(), j.String())
×
211
                        },
×
212
                )
213

214
                err = sqldb.CompareRecords(
×
215
                        node, migratedNode, fmt.Sprintf("node %x", pub),
×
216
                )
×
217
                if err != nil {
×
218
                        return fmt.Errorf("node mismatch after migration "+
×
219
                                "for node %x: %w", pub, err)
×
220
                }
×
221

222
                s.Do(func() {
×
223
                        elapsed := time.Since(t0).Seconds()
×
224
                        ratePerSec := float64(chunk) / elapsed
×
225
                        log.Debugf("Migrated %d nodes (%.2f nodes/sec)",
×
226
                                count, ratePerSec)
×
227

×
228
                        t0 = time.Now()
×
229
                        chunk = 0
×
230
                })
×
231

232
                return nil
×
233
        }, func() {
×
234
                // No reset is needed since if a retry occurs, the entire
×
235
                // migration will be retried from the start.
×
236
        })
×
237
        if err != nil {
×
238
                return fmt.Errorf("could not migrate nodes: %w", err)
×
239
        }
×
240

241
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
242
                "invalid TLV streams)", count, skipped)
×
243

×
244
        return nil
×
245
}
246

247
// migrateSourceNode migrates the source node from the KV backend to the
248
// SQL database.
249
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
250
        sqlDB SQLQueries) error {
×
251

×
252
        log.Debugf("Migrating source node from KV to SQL")
×
253

×
254
        sourceNode, err := sourceNode(kvdb)
×
255
        if errors.Is(err, ErrSourceNodeNotSet) {
×
256
                // If the source node has not been set yet, we can skip this
×
257
                // migration step.
×
258
                return nil
×
259
        } else if err != nil {
×
260
                return fmt.Errorf("could not get source node from kv "+
×
261
                        "store: %w", err)
×
262
        }
×
263

264
        pub := sourceNode.PubKeyBytes
×
265

×
266
        // Get the DB ID of the source node by its public key. This node must
×
267
        // already exist in the SQL database, as it should have been migrated
×
268
        // in the previous node-migration step.
×
269
        id, err := sqlDB.GetNodeIDByPubKey(
×
270
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
271
                        PubKey:  pub[:],
×
272
                        Version: int16(ProtocolV1),
×
273
                },
×
274
        )
×
275
        if err != nil {
×
276
                return fmt.Errorf("could not get source node ID: %w", err)
×
277
        }
×
278

279
        // Now we can add the source node to the SQL database.
280
        err = sqlDB.AddSourceNode(ctx, id)
×
281
        if err != nil {
×
282
                return fmt.Errorf("could not add source node to SQL store: %w",
×
283
                        err)
×
284
        }
×
285

286
        // Verify that the source node was added correctly by fetching it back
287
        // from the SQL database and checking that the expected DB ID and
288
        // pub key are returned. We don't need to do a whole node comparison
289
        // here, as this was already done in the previous migration step.
290
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
291
        if err != nil {
×
292
                return fmt.Errorf("could not get source nodes from SQL "+
×
293
                        "store: %w", err)
×
294
        }
×
295

296
        // The SQL store has support for multiple source nodes (for future
297
        // protocol versions) but this migration is purely aimed at the V1
298
        // store, and so we expect exactly one source node to be present.
299
        if len(srcNodes) != 1 {
×
300
                return fmt.Errorf("expected exactly one source node, "+
×
301
                        "got %d", len(srcNodes))
×
302
        }
×
303

304
        // Check that the source node ID and pub key match the original
305
        // source node.
306
        if srcNodes[0].NodeID != id {
×
307
                return fmt.Errorf("source node ID mismatch after migration: "+
×
308
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
309
        }
×
310
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
311
        if err != nil {
×
312
                return fmt.Errorf("source node pubkey mismatch after "+
×
313
                        "migration: %w", err)
×
314
        }
×
315

316
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
317

×
318
        return nil
×
319
}
320

321
// migrateChannelsAndPolicies migrates all channels and their policies
322
// from the KV backend to the SQL database.
323
func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
NEW
324
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
325

×
326
        var (
×
327
                channelCount       uint64
×
328
                skippedChanCount   uint64
×
329
                policyCount        uint64
×
330
                skippedPolicyCount uint64
×
331

×
332
                t0    = time.Now()
×
333
                chunk uint64
×
334
                s     = rate.Sometimes{
×
335
                        Interval: 10 * time.Second,
×
336
                }
×
337
        )
×
338
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
339
                // If the policy is nil, we can skip it.
×
340
                if policy == nil {
×
341
                        return nil
×
342
                }
×
343

344
                // Unlike the special case of invalid TLV bytes for node and
345
                // channel announcements, we don't need to handle the case for
346
                // channel policies here because it is already handled in the
347
                // `forEachChannel` function. If the policy has invalid TLV
348
                // bytes, then `nil` will be passed to this function.
349

350
                policyCount++
×
351

×
352
                _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
×
353
                if err != nil {
×
354
                        return fmt.Errorf("could not migrate channel "+
×
355
                                "policy %d: %w", policy.ChannelID, err)
×
356
                }
×
357

358
                return nil
×
359
        }
360

361
        // Iterate over each channel in the KV store and migrate it and its
362
        // policies to the SQL database.
363
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
364
                policy1 *models.ChannelEdgePolicy,
×
365
                policy2 *models.ChannelEdgePolicy) error {
×
366

×
367
                scid := channel.ChannelID
×
368

×
369
                // Here, we do a sanity check to ensure that the chain hash of
×
370
                // the channel returned by the KV store matches the expected
×
371
                // chain hash. This is important since in the SQL store, we will
×
372
                // no longer explicitly store the chain hash in the channel
×
373
                // info, but rather rely on the chain hash LND is running with.
×
374
                // So this is our way of ensuring that LND is running on the
×
375
                // correct network at migration time.
×
NEW
376
                if channel.ChainHash != cfg.ChainHash {
×
377
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
NEW
378
                                "expected %s", scid, channel.ChainHash,
×
NEW
379
                                cfg.ChainHash)
×
UNCOV
380
                }
×
381

382
                // Sanity check to ensure that the channel has valid extra
383
                // opaque data. If it does not, we'll skip it. We need to do
384
                // this because previously we would just persist any TLV bytes
385
                // that we received without validating them. Now, however, we
386
                // normalise the storage of extra opaque data, so we need to
387
                // ensure that the data is valid. We don't want to abort the
388
                // migration if we encounter a channel with invalid extra opaque
389
                // data, so we'll just skip it and log a warning.
390
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
391
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
392
                        log.Warnf("Skipping channel %d with invalid "+
×
393
                                "extra opaque data: %v", scid,
×
394
                                channel.ExtraOpaqueData)
×
395

×
396
                        skippedChanCount++
×
397

×
398
                        // If we skip a channel, we also skip its policies.
×
399
                        if policy1 != nil {
×
400
                                skippedPolicyCount++
×
401
                        }
×
402
                        if policy2 != nil {
×
403
                                skippedPolicyCount++
×
404
                        }
×
405

406
                        return nil
×
407
                } else if err != nil {
×
408
                        return fmt.Errorf("unable to marshal extra opaque "+
×
409
                                "data for channel %d (%v): %w", scid,
×
410
                                channel.ExtraOpaqueData, err)
×
411
                }
×
412

413
                channelCount++
×
414
                chunk++
×
415

×
416
                err = migrateSingleChannel(
×
NEW
417
                        ctx, cfg, sqlDB, channel, policy1, policy2,
×
NEW
418
                        migChanPolicy,
×
419
                )
×
420
                if err != nil {
×
421
                        return fmt.Errorf("could not migrate channel %d: %w",
×
422
                                scid, err)
×
423
                }
×
424

425
                s.Do(func() {
×
426
                        elapsed := time.Since(t0).Seconds()
×
427
                        ratePerSec := float64(chunk) / elapsed
×
428
                        log.Debugf("Migrated %d channels (%.2f channels/sec)",
×
429
                                channelCount, ratePerSec)
×
430

×
431
                        t0 = time.Now()
×
432
                        chunk = 0
×
433
                })
×
434

435
                return nil
×
436
        }, func() {
×
437
                // No reset is needed since if a retry occurs, the entire
×
438
                // migration will be retried from the start.
×
439
        })
×
440
        if err != nil {
×
441
                return fmt.Errorf("could not migrate channels and policies: %w",
×
442
                        err)
×
443
        }
×
444

445
        log.Infof("Migrated %d channels and %d policies from KV to SQL "+
×
446
                "(skipped %d channels and %d policies due to invalid TLV "+
×
447
                "streams)", channelCount, policyCount, skippedChanCount,
×
448
                skippedPolicyCount)
×
449

×
450
        return nil
×
451
}
452

453
func migrateSingleChannel(ctx context.Context, cfg *SQLStoreConfig,
454
        sqlDB SQLQueries, channel *models.ChannelEdgeInfo,
455
        policy1, policy2 *models.ChannelEdgePolicy,
456
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
457

×
458
        scid := channel.ChannelID
×
459

×
460
        // First, migrate the channel info along with its policies.
×
461
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
462
        if err != nil {
×
463
                return fmt.Errorf("could not insert record for channel %d "+
×
464
                        "in SQL store: %w", scid, err)
×
465
        }
×
466

467
        // Now, migrate the two channel policies.
468
        err = migChanPolicy(policy1)
×
469
        if err != nil {
×
470
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
471
                        err)
×
472
        }
×
473
        err = migChanPolicy(policy2)
×
474
        if err != nil {
×
475
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
476
                        err)
×
477
        }
×
478

479
        // Now, fetch the channel and its policies from the SQL DB.
480
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
481
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
482
                        Scid:    channelIDToBytes(scid),
×
483
                        Version: int16(ProtocolV1),
×
484
                },
×
485
        )
×
486
        if err != nil {
×
487
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
488
                        err)
×
489
        }
×
490

491
        // Assert that the DB IDs for the channel and nodes are as expected
492
        // given the inserted channel info.
493
        err = sqldb.CompareRecords(
×
494
                dbChanInfo.channelID, row.GraphChannel.ID, "channel DB ID",
×
495
        )
×
496
        if err != nil {
×
497
                return err
×
498
        }
×
499
        err = sqldb.CompareRecords(
×
500
                dbChanInfo.node1ID, row.GraphNode.ID, "node1 DB ID",
×
501
        )
×
502
        if err != nil {
×
503
                return err
×
504
        }
×
505
        err = sqldb.CompareRecords(
×
506
                dbChanInfo.node2ID, row.GraphNode_2.ID, "node2 DB ID",
×
507
        )
×
508
        if err != nil {
×
509
                return err
×
510
        }
×
511

512
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
NEW
513
                ctx, cfg, sqlDB, row,
×
514
        )
×
515
        if err != nil {
×
516
                return fmt.Errorf("could not build migrated channel and "+
×
517
                        "policies: %w", err)
×
518
        }
×
519

520
        // Finally, compare the original channel info and
521
        // policies with the migrated ones to ensure they match.
522
        if len(channel.ExtraOpaqueData) == 0 {
×
523
                channel.ExtraOpaqueData = nil
×
524
        }
×
525
        if len(migChan.ExtraOpaqueData) == 0 {
×
526
                migChan.ExtraOpaqueData = nil
×
527
        }
×
528

529
        err = sqldb.CompareRecords(
×
530
                channel, migChan, fmt.Sprintf("channel %d", scid),
×
531
        )
×
532
        if err != nil {
×
533
                return err
×
534
        }
×
535

536
        checkPolicy := func(expPolicy,
×
537
                migPolicy *models.ChannelEdgePolicy) error {
×
538

×
539
                switch {
×
540
                // Both policies are nil, nothing to compare.
541
                case expPolicy == nil && migPolicy == nil:
×
542
                        return nil
×
543

544
                // One of the policies is nil, but the other is not.
545
                case expPolicy == nil || migPolicy == nil:
×
546
                        return fmt.Errorf("expected both policies to be "+
×
547
                                "non-nil. Got expPolicy: %v, "+
×
548
                                "migPolicy: %v", expPolicy, migPolicy)
×
549

550
                // Both policies are non-nil, we can compare them.
551
                default:
×
552
                }
553

554
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
555
                        expPolicy.ExtraOpaqueData = nil
×
556
                }
×
557
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
558
                        migPolicy.ExtraOpaqueData = nil
×
559
                }
×
560

561
                return sqldb.CompareRecords(
×
562
                        *expPolicy, *migPolicy, "channel policy",
×
563
                )
×
564
        }
565

566
        err = checkPolicy(policy1, migPol1)
×
567
        if err != nil {
×
568
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
569
                        err)
×
570
        }
×
571

572
        err = checkPolicy(policy2, migPol2)
×
573
        if err != nil {
×
574
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
575
                        err)
×
576
        }
×
577

578
        return nil
×
579
}
580

581
// migratePruneLog migrates the prune log from the KV backend to the SQL
582
// database. It iterates over each prune log entry in the KV store, inserts it
583
// into the SQL database, and then verifies that the entry was inserted
584
// correctly by fetching it back from the SQL database and comparing it to the
585
// original entry.
586
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
587
        sqlDB SQLQueries) error {
×
588

×
589
        var (
×
590
                count          uint64
×
591
                pruneTipHeight uint32
×
592
                pruneTipHash   chainhash.Hash
×
593

×
594
                t0    = time.Now()
×
595
                chunk uint64
×
596
                s     = rate.Sometimes{
×
597
                        Interval: 10 * time.Second,
×
598
                }
×
599
        )
×
600

×
601
        // migrateSinglePruneEntry is a helper function that inserts a single
×
602
        // prune log entry into the SQL database and verifies that it was
×
603
        // inserted correctly.
×
604
        migrateSinglePruneEntry := func(height uint32,
×
605
                hash *chainhash.Hash) error {
×
606

×
607
                count++
×
NEW
608
                chunk++
×
609

×
610
                // Keep track of the prune tip height and hash.
×
611
                if height > pruneTipHeight {
×
612
                        pruneTipHeight = height
×
613
                        pruneTipHash = *hash
×
614
                }
×
615

616
                err := sqlDB.UpsertPruneLogEntry(
×
617
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
618
                                BlockHeight: int64(height),
×
619
                                BlockHash:   hash[:],
×
620
                        },
×
621
                )
×
622
                if err != nil {
×
623
                        return fmt.Errorf("unable to insert prune log "+
×
624
                                "entry for height %d: %w", height, err)
×
625
                }
×
626

627
                // Now, check that the entry was inserted correctly.
628
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
629
                        ctx, int64(height),
×
630
                )
×
631
                if err != nil {
×
632
                        return fmt.Errorf("could not get prune hash "+
×
633
                                "for height %d: %w", height, err)
×
634
                }
×
635

636
                return sqldb.CompareRecords(
×
637
                        hash[:], migratedHash, "prune log entry",
×
638
                )
×
639
        }
640

641
        // Iterate over each prune log entry in the KV store and migrate it to
642
        // the SQL database.
643
        err := forEachPruneLogEntry(
×
644
                kvBackend, func(height uint32, hash *chainhash.Hash) error {
×
645
                        err := migrateSinglePruneEntry(height, hash)
×
646
                        if err != nil {
×
647
                                return fmt.Errorf("could not migrate "+
×
648
                                        "prune log entry at height %d: %w",
×
649
                                        height, err)
×
650
                        }
×
651

652
                        s.Do(func() {
×
653
                                elapsed := time.Since(t0).Seconds()
×
654
                                ratePerSec := float64(chunk) / elapsed
×
655
                                log.Debugf("Migrated %d prune log "+
×
656
                                        "entries (%.2f entries/sec)",
×
657
                                        count, ratePerSec)
×
658

×
659
                                t0 = time.Now()
×
660
                                chunk = 0
×
661
                        })
×
662

663
                        return nil
×
664
                },
665
        )
666
        if err != nil {
×
667
                return fmt.Errorf("could not migrate prune log: %w", err)
×
668
        }
×
669

670
        // Check that the prune tip is set correctly in the SQL
671
        // database.
672
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
673
        if errors.Is(err, sql.ErrNoRows) {
×
674
                // The ErrGraphNeverPruned error is expected if no prune log
×
675
                // entries were migrated from the kvdb store. Otherwise, it's
×
676
                // an unexpected error.
×
677
                if count == 0 {
×
678
                        log.Infof("No prune log entries found in KV store " +
×
679
                                "to migrate")
×
680
                        return nil
×
681
                }
×
682
                // Fall-through to the next error check.
683
        }
684
        if err != nil {
×
685
                return fmt.Errorf("could not get prune tip: %w", err)
×
686
        }
×
687

688
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
689
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
690

×
691
                return fmt.Errorf("prune tip mismatch after migration: "+
×
692
                        "expected height %d, hash %s; got height %d, "+
×
693
                        "hash %s", pruneTipHeight, pruneTipHash,
×
694
                        pruneTip.BlockHeight,
×
695
                        chainhash.Hash(pruneTip.BlockHash))
×
696
        }
×
697

698
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
699
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
700
                pruneTipHash)
×
701

×
702
        return nil
×
703
}
704

705
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
706
// and policies from the given row returned by the SQL query
707
// GetChannelBySCIDWithPolicies.
708
func getAndBuildChanAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
709
        db SQLQueries,
710
        row sqlc.GetChannelBySCIDWithPoliciesRow) (*models.ChannelEdgeInfo,
711
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
712

×
713
        node1, node2, err := buildNodeVertices(
×
714
                row.GraphNode.PubKey, row.GraphNode_2.PubKey,
×
715
        )
×
716
        if err != nil {
×
717
                return nil, nil, nil, err
×
718
        }
×
719

720
        edge, err := getAndBuildEdgeInfo(
×
NEW
721
                ctx, cfg, db, row.GraphChannel, node1, node2,
×
722
        )
×
723
        if err != nil {
×
724
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
725
                        "info: %w", err)
×
726
        }
×
727

728
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
729
        if err != nil {
×
730
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
731
                        "policies: %w", err)
×
732
        }
×
733

734
        policy1, policy2, err := getAndBuildChanPolicies(
×
NEW
735
                ctx, cfg.QueryCfg, db, dbPol1, dbPol2, edge.ChannelID, node1,
×
NEW
736
                node2,
×
737
        )
×
738
        if err != nil {
×
739
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
740
                        "policies: %w", err)
×
741
        }
×
742

743
        return edge, policy1, policy2, nil
×
744
}
745

746
// forEachPruneLogEntry iterates over each prune log entry in the KV
747
// backend and calls the provided callback function for each entry.
748
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
749
        hash *chainhash.Hash) error) error {
×
750

×
751
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
752
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
753
                if metaBucket == nil {
×
754
                        return ErrGraphNotFound
×
755
                }
×
756

757
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
758
                if pruneBucket == nil {
×
759
                        // The graph has never been pruned and so, there are no
×
760
                        // entries to iterate over.
×
761
                        return nil
×
762
                }
×
763

764
                return pruneBucket.ForEach(func(k, v []byte) error {
×
765
                        blockHeight := byteOrder.Uint32(k)
×
766
                        var blockHash chainhash.Hash
×
767
                        copy(blockHash[:], v)
×
768

×
769
                        return cb(blockHeight, &blockHash)
×
770
                })
×
771
        }, func() {})
×
772
}
773

774
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
775
// the SQL database. It iterates over each closed SCID in the KV store, inserts
776
// it into the SQL database, and then verifies that the SCID was inserted
777
// correctly by checking if the channel with the given SCID is seen as closed in
778
// the SQL database.
779
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
780
        sqlDB SQLQueries) error {
×
781

×
782
        var (
×
783
                count uint64
×
784

×
785
                t0    = time.Now()
×
786
                chunk uint64
×
787
                s     = rate.Sometimes{
×
788
                        Interval: 10 * time.Second,
×
789
                }
×
790
        )
×
791
        migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
×
792
                count++
×
NEW
793
                chunk++
×
794

×
795
                chanIDB := channelIDToBytes(scid.ToUint64())
×
796
                err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
797
                if err != nil {
×
798
                        return fmt.Errorf("could not insert closed channel "+
×
799
                                "with SCID %s: %w", scid, err)
×
800
                }
×
801

802
                // Now, verify that the channel with the given SCID is
803
                // seen as closed.
804
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
805
                if err != nil {
×
806
                        return fmt.Errorf("could not check if channel %s "+
×
807
                                "is closed: %w", scid, err)
×
808
                }
×
809

810
                if !isClosed {
×
811
                        return fmt.Errorf("channel %s should be closed, "+
×
812
                                "but is not", scid)
×
813
                }
×
814

815
                s.Do(func() {
×
816
                        elapsed := time.Since(t0).Seconds()
×
817
                        ratePerSec := float64(chunk) / elapsed
×
818
                        log.Debugf("Migrated %d closed scids "+
×
819
                                "(%.2f entries/sec)", count, ratePerSec)
×
820

×
821
                        t0 = time.Now()
×
822
                        chunk = 0
×
823
                })
×
824

825
                return nil
×
826
        }
827

828
        err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID)
×
829
        if err != nil {
×
830
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
831
                        err)
×
832
        }
×
833

834
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
835

×
836
        return nil
×
837
}
838

839
// migrateZombieIndex migrates the zombie index from the KV backend to
840
// the SQL database. It iterates over each zombie channel in the KV store,
841
// inserts it into the SQL database, and then verifies that the channel is
842
// indeed marked as a zombie channel in the SQL database.
843
//
844
// NOTE: before inserting an entry into the zombie index, the function checks
845
// if the channel is already marked as closed in the SQL store. If it is,
846
// the entry is skipped. This means that the resulting zombie index count in
847
// the SQL store may well be less than the count of zombie channels in the KV
848
// store.
849
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
850
        sqlDB SQLQueries) error {
×
851

×
852
        var (
×
853
                count uint64
×
854

×
855
                t0    = time.Now()
×
856
                chunk uint64
×
857
                s     = rate.Sometimes{
×
858
                        Interval: 10 * time.Second,
×
859
                }
×
860
        )
×
861
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
862
                pubKey2 [33]byte) error {
×
863

×
864
                chanIDB := channelIDToBytes(chanID)
×
865

×
866
                // If it is in the closed SCID index, we don't need to
×
867
                // add it to the zombie index.
×
868
                //
×
869
                // NOTE: this means that the resulting zombie index count in
×
870
                // the SQL store may well be less than the count of zombie
×
871
                // channels in the KV store.
×
872
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
873
                if err != nil {
×
874
                        return fmt.Errorf("could not check closed "+
×
875
                                "channel: %w", err)
×
876
                }
×
877
                if isClosed {
×
878
                        return nil
×
879
                }
×
880

881
                count++
×
NEW
882
                chunk++
×
883

×
884
                err = sqlDB.UpsertZombieChannel(
×
885
                        ctx, sqlc.UpsertZombieChannelParams{
×
886
                                Version:  int16(ProtocolV1),
×
887
                                Scid:     chanIDB,
×
888
                                NodeKey1: pubKey1[:],
×
889
                                NodeKey2: pubKey2[:],
×
890
                        },
×
891
                )
×
892
                if err != nil {
×
893
                        return fmt.Errorf("could not upsert zombie "+
×
894
                                "channel %d: %w", chanID, err)
×
895
                }
×
896

897
                // Finally, verify that the channel is indeed marked as a
898
                // zombie channel.
899
                isZombie, err := sqlDB.IsZombieChannel(
×
900
                        ctx, sqlc.IsZombieChannelParams{
×
901
                                Version: int16(ProtocolV1),
×
902
                                Scid:    chanIDB,
×
903
                        },
×
904
                )
×
905
                if err != nil {
×
906
                        return fmt.Errorf("could not check if "+
×
907
                                "channel %d is zombie: %w", chanID, err)
×
908
                }
×
909

910
                if !isZombie {
×
911
                        return fmt.Errorf("channel %d should be "+
×
912
                                "a zombie, but is not", chanID)
×
913
                }
×
914

915
                s.Do(func() {
×
916
                        elapsed := time.Since(t0).Seconds()
×
917
                        ratePerSec := float64(chunk) / elapsed
×
918
                        log.Debugf("Migrated %d zombie index entries "+
×
919
                                "(%.2f entries/sec)", count, ratePerSec)
×
920

×
921
                        t0 = time.Now()
×
922
                        chunk = 0
×
923
                })
×
924

925
                return nil
×
926
        })
927
        if err != nil {
×
928
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
929
        }
×
930

931
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
932

×
933
        return nil
×
934
}
935

936
// forEachZombieEntry iterates over each zombie channel entry in the
937
// KV backend and calls the provided callback function for each entry.
938
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
939
        pubKey2 [33]byte) error) error {
×
940

×
941
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
942
                edges := tx.ReadBucket(edgeBucket)
×
943
                if edges == nil {
×
944
                        return ErrGraphNoEdgesFound
×
945
                }
×
946
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
947
                if zombieIndex == nil {
×
948
                        return nil
×
949
                }
×
950

951
                return zombieIndex.ForEach(func(k, v []byte) error {
×
952
                        var pubKey1, pubKey2 [33]byte
×
953
                        copy(pubKey1[:], v[:33])
×
954
                        copy(pubKey2[:], v[33:])
×
955

×
956
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
957
                })
×
958
        }, func() {})
×
959
}
960

961
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
962
// the provided callback function for each SCID.
963
func forEachClosedSCID(db kvdb.Backend,
964
        cb func(lnwire.ShortChannelID) error) error {
×
965

×
966
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
967
                closedScids := tx.ReadBucket(closedScidBucket)
×
968
                if closedScids == nil {
×
969
                        return nil
×
970
                }
×
971

972
                return closedScids.ForEach(func(k, _ []byte) error {
×
973
                        return cb(lnwire.NewShortChanIDFromInt(
×
974
                                byteOrder.Uint64(k),
×
975
                        ))
×
976
                })
×
977
        }, func() {})
×
978
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc