• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16643433757

31 Jul 2025 08:03AM UTC coverage: 67.044% (-0.03%) from 67.074%
16643433757

push

github

web-flow
Merge pull request #10116 from ellemouton/graphPerf4

[3] graph/db: batch-fetch channel & policy data

0 of 281 new or added lines in 3 files covered. (0.0%)

54 existing lines in 13 files now uncovered.

135555 of 202189 relevant lines covered (67.04%)

21624.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "cmp"
6
        "context"
7
        "database/sql"
8
        "errors"
9
        "fmt"
10
        "net"
11
        "slices"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb"
19
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
20
        "golang.org/x/time/rate"
21
)
22

23
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
24
// backend.
25
//
26
// NOTE: this is currently not called from any code path. It is called via tests
27
// only for now and will be called from the main lnd binary once the
28
// migration is fully implemented and tested.
29
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
30
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
31

×
32
        log.Infof("Starting migration of the graph store from KV to SQL")
×
33
        t0 := time.Now()
×
34

×
35
        // Check if there is a graph to migrate.
×
36
        graphExists, err := checkGraphExists(kvBackend)
×
37
        if err != nil {
×
38
                return fmt.Errorf("failed to check graph existence: %w", err)
×
39
        }
×
40
        if !graphExists {
×
41
                log.Infof("No graph found in KV store, skipping the migration")
×
42
                return nil
×
43
        }
×
44

45
        // 1) Migrate all the nodes.
46
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
47
                return fmt.Errorf("could not migrate nodes: %w", err)
×
48
        }
×
49

50
        // 2) Migrate the source node.
51
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
52
                return fmt.Errorf("could not migrate source node: %w", err)
×
53
        }
×
54

55
        // 3) Migrate all the channels and channel policies.
56
        err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
×
57
        if err != nil {
×
58
                return fmt.Errorf("could not migrate channels and policies: %w",
×
59
                        err)
×
60
        }
×
61

62
        // 4) Migrate the Prune log.
63
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
64
                return fmt.Errorf("could not migrate prune log: %w", err)
×
65
        }
×
66

67
        // 5) Migrate the closed SCID index.
68
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
69
        if err != nil {
×
70
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
71
                        err)
×
72
        }
×
73

74
        // 6) Migrate the zombie index.
75
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
76
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
77
        }
×
78

79
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
80
                time.Since(t0))
×
81

×
82
        return nil
×
83
}
84

85
// checkGraphExists checks if the graph exists in the KV backend.
86
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
87
        // Check if there is even a graph to migrate.
×
88
        err := db.View(func(tx kvdb.RTx) error {
×
89
                // Check for the existence of the node bucket which is a top
×
90
                // level bucket that would have been created on the initial
×
91
                // creation of the graph store.
×
92
                nodes := tx.ReadBucket(nodeBucket)
×
93
                if nodes == nil {
×
94
                        return ErrGraphNotFound
×
95
                }
×
96

97
                return nil
×
98
        }, func() {})
×
99
        if errors.Is(err, ErrGraphNotFound) {
×
100
                return false, nil
×
101
        } else if err != nil {
×
102
                return false, err
×
103
        }
×
104

105
        return true, nil
×
106
}
107

108
// migrateNodes migrates all nodes from the KV backend to the SQL database.
109
// This includes doing a sanity check after each migration to ensure that the
110
// migrated node matches the original node.
111
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
112
        sqlDB SQLQueries) error {
×
113

×
114
        // Keep track of the number of nodes migrated and the number of
×
115
        // nodes skipped due to errors.
×
116
        var (
×
117
                count   uint64
×
118
                skipped uint64
×
119

×
120
                t0    = time.Now()
×
121
                chunk uint64
×
122
                s     = rate.Sometimes{
×
123
                        Interval: 10 * time.Second,
×
124
                }
×
125
        )
×
126

×
127
        // Loop through each node in the KV store and insert it into the SQL
×
128
        // database.
×
129
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
130
                node *models.LightningNode) error {
×
131

×
132
                pub := node.PubKeyBytes
×
133

×
134
                // Sanity check to ensure that the node has valid extra opaque
×
135
                // data. If it does not, we'll skip it. We need to do this
×
136
                // because previously we would just persist any TLV bytes that
×
137
                // we received without validating them. Now, however, we
×
138
                // normalise the storage of extra opaque data, so we need to
×
139
                // ensure that the data is valid. We don't want to abort the
×
140
                // migration if we encounter a node with invalid extra opaque
×
141
                // data, so we'll just skip it and log a warning.
×
142
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
143
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
144
                        skipped++
×
145
                        log.Warnf("Skipping migration of node %x with invalid "+
×
146
                                "extra opaque data: %v", pub,
×
147
                                node.ExtraOpaqueData)
×
148

×
149
                        return nil
×
150
                } else if err != nil {
×
151
                        return fmt.Errorf("unable to marshal extra "+
×
152
                                "opaque data for node %x: %w", pub, err)
×
153
                }
×
154

155
                count++
×
156
                chunk++
×
157

×
158
                // TODO(elle): At this point, we should check the loaded node
×
159
                // to see if we should extract any DNS addresses from its
×
160
                // opaque type addresses. This is expected to be done in:
×
161
                // https://github.com/lightningnetwork/lnd/pull/9455.
×
162
                // This TODO is being tracked in
×
163
                //  https://github.com/lightningnetwork/lnd/issues/9795 as this
×
164
                // must be addressed before making this code path active in
×
165
                // production.
×
166

×
167
                // Write the node to the SQL database.
×
168
                id, err := upsertNode(ctx, sqlDB, node)
×
169
                if err != nil {
×
170
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
171
                                err)
×
172
                }
×
173

174
                // Fetch it from the SQL store and compare it against the
175
                // original node object to ensure the migration was successful.
176
                dbNode, err := sqlDB.GetNodeByPubKey(
×
177
                        ctx, sqlc.GetNodeByPubKeyParams{
×
178
                                PubKey:  node.PubKeyBytes[:],
×
179
                                Version: int16(ProtocolV1),
×
180
                        },
×
181
                )
×
182
                if err != nil {
×
183
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
184
                                "after migration: %w", pub, err)
×
185
                }
×
186

187
                // Sanity check: ensure the migrated node ID matches the one we
188
                // just inserted.
189
                if dbNode.ID != id {
×
190
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
191
                                "after migration: expected %d, got %d",
×
192
                                pub, id, dbNode.ID)
×
193
                }
×
194

195
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
196
                if err != nil {
×
197
                        return fmt.Errorf("could not build migrated node "+
×
198
                                "from dbNode(db id: %d, node pub: %x): %w",
×
199
                                dbNode.ID, pub, err)
×
200
                }
×
201

202
                // Make sure that the node addresses are sorted before
203
                // comparing them to ensure that the order of addresses does
204
                // not affect the comparison.
205
                slices.SortFunc(node.Addresses, func(i, j net.Addr) int {
×
206
                        return cmp.Compare(i.String(), j.String())
×
207
                })
×
208
                slices.SortFunc(
×
209
                        migratedNode.Addresses, func(i, j net.Addr) int {
×
210
                                return cmp.Compare(i.String(), j.String())
×
211
                        },
×
212
                )
213

214
                err = sqldb.CompareRecords(
×
215
                        node, migratedNode, fmt.Sprintf("node %x", pub),
×
216
                )
×
217
                if err != nil {
×
218
                        return fmt.Errorf("node mismatch after migration "+
×
219
                                "for node %x: %w", pub, err)
×
220
                }
×
221

222
                s.Do(func() {
×
223
                        elapsed := time.Since(t0).Seconds()
×
224
                        ratePerSec := float64(chunk) / elapsed
×
225
                        log.Debugf("Migrated %d nodes (%.2f nodes/sec)",
×
226
                                count, ratePerSec)
×
227

×
228
                        t0 = time.Now()
×
229
                        chunk = 0
×
230
                })
×
231

232
                return nil
×
233
        }, func() {
×
234
                // No reset is needed since if a retry occurs, the entire
×
235
                // migration will be retried from the start.
×
236
        })
×
237
        if err != nil {
×
238
                return fmt.Errorf("could not migrate nodes: %w", err)
×
239
        }
×
240

241
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
242
                "invalid TLV streams)", count, skipped)
×
243

×
244
        return nil
×
245
}
246

247
// migrateSourceNode migrates the source node from the KV backend to the
248
// SQL database.
249
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
250
        sqlDB SQLQueries) error {
×
251

×
252
        log.Debugf("Migrating source node from KV to SQL")
×
253

×
254
        sourceNode, err := sourceNode(kvdb)
×
255
        if errors.Is(err, ErrSourceNodeNotSet) {
×
256
                // If the source node has not been set yet, we can skip this
×
257
                // migration step.
×
258
                return nil
×
259
        } else if err != nil {
×
260
                return fmt.Errorf("could not get source node from kv "+
×
261
                        "store: %w", err)
×
262
        }
×
263

264
        pub := sourceNode.PubKeyBytes
×
265

×
266
        // Get the DB ID of the source node by its public key. This node must
×
267
        // already exist in the SQL database, as it should have been migrated
×
268
        // in the previous node-migration step.
×
269
        id, err := sqlDB.GetNodeIDByPubKey(
×
270
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
271
                        PubKey:  pub[:],
×
272
                        Version: int16(ProtocolV1),
×
273
                },
×
274
        )
×
275
        if err != nil {
×
276
                return fmt.Errorf("could not get source node ID: %w", err)
×
277
        }
×
278

279
        // Now we can add the source node to the SQL database.
280
        err = sqlDB.AddSourceNode(ctx, id)
×
281
        if err != nil {
×
282
                return fmt.Errorf("could not add source node to SQL store: %w",
×
283
                        err)
×
284
        }
×
285

286
        // Verify that the source node was added correctly by fetching it back
287
        // from the SQL database and checking that the expected DB ID and
288
        // pub key are returned. We don't need to do a whole node comparison
289
        // here, as this was already done in the previous migration step.
290
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
291
        if err != nil {
×
292
                return fmt.Errorf("could not get source nodes from SQL "+
×
293
                        "store: %w", err)
×
294
        }
×
295

296
        // The SQL store has support for multiple source nodes (for future
297
        // protocol versions) but this migration is purely aimed at the V1
298
        // store, and so we expect exactly one source node to be present.
299
        if len(srcNodes) != 1 {
×
300
                return fmt.Errorf("expected exactly one source node, "+
×
301
                        "got %d", len(srcNodes))
×
302
        }
×
303

304
        // Check that the source node ID and pub key match the original
305
        // source node.
306
        if srcNodes[0].NodeID != id {
×
307
                return fmt.Errorf("source node ID mismatch after migration: "+
×
308
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
309
        }
×
310
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
311
        if err != nil {
×
312
                return fmt.Errorf("source node pubkey mismatch after "+
×
313
                        "migration: %w", err)
×
314
        }
×
315

316
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
317

×
318
        return nil
×
319
}
320

321
// migrateChannelsAndPolicies migrates all channels and their policies
322
// from the KV backend to the SQL database.
323
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
324
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
325

×
326
        var (
×
327
                channelCount       uint64
×
328
                skippedChanCount   uint64
×
329
                policyCount        uint64
×
330
                skippedPolicyCount uint64
×
331

×
332
                t0    = time.Now()
×
333
                chunk uint64
×
334
                s     = rate.Sometimes{
×
335
                        Interval: 10 * time.Second,
×
336
                }
×
337
        )
×
338
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
339
                // If the policy is nil, we can skip it.
×
340
                if policy == nil {
×
341
                        return nil
×
342
                }
×
343

344
                // Unlike the special case of invalid TLV bytes for node and
345
                // channel announcements, we don't need to handle the case for
346
                // channel policies here because it is already handled in the
347
                // `forEachChannel` function. If the policy has invalid TLV
348
                // bytes, then `nil` will be passed to this function.
349

350
                policyCount++
×
351

×
352
                _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
×
353
                if err != nil {
×
354
                        return fmt.Errorf("could not migrate channel "+
×
355
                                "policy %d: %w", policy.ChannelID, err)
×
356
                }
×
357

358
                return nil
×
359
        }
360

361
        // Iterate over each channel in the KV store and migrate it and its
362
        // policies to the SQL database.
363
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
364
                policy1 *models.ChannelEdgePolicy,
×
365
                policy2 *models.ChannelEdgePolicy) error {
×
366

×
367
                scid := channel.ChannelID
×
368

×
369
                // Here, we do a sanity check to ensure that the chain hash of
×
370
                // the channel returned by the KV store matches the expected
×
371
                // chain hash. This is important since in the SQL store, we will
×
372
                // no longer explicitly store the chain hash in the channel
×
373
                // info, but rather rely on the chain hash LND is running with.
×
374
                // So this is our way of ensuring that LND is running on the
×
375
                // correct network at migration time.
×
376
                if channel.ChainHash != chain {
×
377
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
378
                                "expected %s", scid, channel.ChainHash, chain)
×
379
                }
×
380

381
                // Sanity check to ensure that the channel has valid extra
382
                // opaque data. If it does not, we'll skip it. We need to do
383
                // this because previously we would just persist any TLV bytes
384
                // that we received without validating them. Now, however, we
385
                // normalise the storage of extra opaque data, so we need to
386
                // ensure that the data is valid. We don't want to abort the
387
                // migration if we encounter a channel with invalid extra opaque
388
                // data, so we'll just skip it and log a warning.
389
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
390
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
391
                        log.Warnf("Skipping channel %d with invalid "+
×
392
                                "extra opaque data: %v", scid,
×
393
                                channel.ExtraOpaqueData)
×
394

×
395
                        skippedChanCount++
×
396

×
397
                        // If we skip a channel, we also skip its policies.
×
398
                        if policy1 != nil {
×
399
                                skippedPolicyCount++
×
400
                        }
×
401
                        if policy2 != nil {
×
402
                                skippedPolicyCount++
×
403
                        }
×
404

405
                        return nil
×
406
                } else if err != nil {
×
407
                        return fmt.Errorf("unable to marshal extra opaque "+
×
408
                                "data for channel %d (%v): %w", scid,
×
409
                                channel.ExtraOpaqueData, err)
×
410
                }
×
411

412
                channelCount++
×
413
                err = migrateSingleChannel(
×
414
                        ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
×
415
                )
×
416
                if err != nil {
×
417
                        return fmt.Errorf("could not migrate channel %d: %w",
×
418
                                scid, err)
×
419
                }
×
420

421
                s.Do(func() {
×
422
                        elapsed := time.Since(t0).Seconds()
×
423
                        ratePerSec := float64(chunk) / elapsed
×
424
                        log.Debugf("Migrated %d channels (%.2f channels/sec)",
×
425
                                channelCount, ratePerSec)
×
426

×
427
                        t0 = time.Now()
×
428
                        chunk = 0
×
429
                })
×
430

431
                return nil
×
432
        }, func() {
×
433
                // No reset is needed since if a retry occurs, the entire
×
434
                // migration will be retried from the start.
×
435
        })
×
436
        if err != nil {
×
437
                return fmt.Errorf("could not migrate channels and policies: %w",
×
438
                        err)
×
439
        }
×
440

441
        log.Infof("Migrated %d channels and %d policies from KV to SQL "+
×
442
                "(skipped %d channels and %d policies due to invalid TLV "+
×
443
                "streams)", channelCount, policyCount, skippedChanCount,
×
444
                skippedPolicyCount)
×
445

×
446
        return nil
×
447
}
448

449
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
450
        channel *models.ChannelEdgeInfo,
451
        policy1, policy2 *models.ChannelEdgePolicy,
452
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
453

×
454
        scid := channel.ChannelID
×
455

×
456
        // First, migrate the channel info along with its policies.
×
457
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
458
        if err != nil {
×
459
                return fmt.Errorf("could not insert record for channel %d "+
×
460
                        "in SQL store: %w", scid, err)
×
461
        }
×
462

463
        // Now, migrate the two channel policies.
464
        err = migChanPolicy(policy1)
×
465
        if err != nil {
×
466
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
467
                        err)
×
468
        }
×
469
        err = migChanPolicy(policy2)
×
470
        if err != nil {
×
471
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
472
                        err)
×
473
        }
×
474

475
        // Now, fetch the channel and its policies from the SQL DB.
476
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
477
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
478
                        Scid:    channelIDToBytes(scid),
×
479
                        Version: int16(ProtocolV1),
×
480
                },
×
481
        )
×
482
        if err != nil {
×
483
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
484
                        err)
×
485
        }
×
486

487
        // Assert that the DB IDs for the channel and nodes are as expected
488
        // given the inserted channel info.
489
        err = sqldb.CompareRecords(
×
490
                dbChanInfo.channelID, row.GraphChannel.ID, "channel DB ID",
×
491
        )
×
492
        if err != nil {
×
493
                return err
×
494
        }
×
495
        err = sqldb.CompareRecords(
×
496
                dbChanInfo.node1ID, row.GraphNode.ID, "node1 DB ID",
×
497
        )
×
498
        if err != nil {
×
499
                return err
×
500
        }
×
501
        err = sqldb.CompareRecords(
×
502
                dbChanInfo.node2ID, row.GraphNode_2.ID, "node2 DB ID",
×
503
        )
×
504
        if err != nil {
×
505
                return err
×
506
        }
×
507

508
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
509
                ctx, sqlDB, row, channel.ChainHash,
×
510
        )
×
511
        if err != nil {
×
512
                return fmt.Errorf("could not build migrated channel and "+
×
513
                        "policies: %w", err)
×
514
        }
×
515

516
        // Finally, compare the original channel info and
517
        // policies with the migrated ones to ensure they match.
518
        if len(channel.ExtraOpaqueData) == 0 {
×
519
                channel.ExtraOpaqueData = nil
×
520
        }
×
521
        if len(migChan.ExtraOpaqueData) == 0 {
×
522
                migChan.ExtraOpaqueData = nil
×
523
        }
×
524

525
        err = sqldb.CompareRecords(
×
526
                channel, migChan, fmt.Sprintf("channel %d", scid),
×
527
        )
×
528
        if err != nil {
×
529
                return err
×
530
        }
×
531

532
        checkPolicy := func(expPolicy,
×
533
                migPolicy *models.ChannelEdgePolicy) error {
×
534

×
535
                switch {
×
536
                // Both policies are nil, nothing to compare.
537
                case expPolicy == nil && migPolicy == nil:
×
538
                        return nil
×
539

540
                // One of the policies is nil, but the other is not.
541
                case expPolicy == nil || migPolicy == nil:
×
542
                        return fmt.Errorf("expected both policies to be "+
×
543
                                "non-nil. Got expPolicy: %v, "+
×
544
                                "migPolicy: %v", expPolicy, migPolicy)
×
545

546
                // Both policies are non-nil, we can compare them.
547
                default:
×
548
                }
549

550
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
551
                        expPolicy.ExtraOpaqueData = nil
×
552
                }
×
553
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
554
                        migPolicy.ExtraOpaqueData = nil
×
555
                }
×
556

557
                return sqldb.CompareRecords(
×
558
                        *expPolicy, *migPolicy, "channel policy",
×
559
                )
×
560
        }
561

562
        err = checkPolicy(policy1, migPol1)
×
563
        if err != nil {
×
564
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
565
                        err)
×
566
        }
×
567

568
        err = checkPolicy(policy2, migPol2)
×
569
        if err != nil {
×
570
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
571
                        err)
×
572
        }
×
573

574
        return nil
×
575
}
576

577
// migratePruneLog migrates the prune log from the KV backend to the SQL
578
// database. It iterates over each prune log entry in the KV store, inserts it
579
// into the SQL database, and then verifies that the entry was inserted
580
// correctly by fetching it back from the SQL database and comparing it to the
581
// original entry.
582
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
583
        sqlDB SQLQueries) error {
×
584

×
585
        var (
×
586
                count          uint64
×
587
                pruneTipHeight uint32
×
588
                pruneTipHash   chainhash.Hash
×
589

×
590
                t0    = time.Now()
×
591
                chunk uint64
×
592
                s     = rate.Sometimes{
×
593
                        Interval: 10 * time.Second,
×
594
                }
×
595
        )
×
596

×
597
        // migrateSinglePruneEntry is a helper function that inserts a single
×
598
        // prune log entry into the SQL database and verifies that it was
×
599
        // inserted correctly.
×
600
        migrateSinglePruneEntry := func(height uint32,
×
601
                hash *chainhash.Hash) error {
×
602

×
603
                count++
×
604

×
605
                // Keep track of the prune tip height and hash.
×
606
                if height > pruneTipHeight {
×
607
                        pruneTipHeight = height
×
608
                        pruneTipHash = *hash
×
609
                }
×
610

611
                err := sqlDB.UpsertPruneLogEntry(
×
612
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
613
                                BlockHeight: int64(height),
×
614
                                BlockHash:   hash[:],
×
615
                        },
×
616
                )
×
617
                if err != nil {
×
618
                        return fmt.Errorf("unable to insert prune log "+
×
619
                                "entry for height %d: %w", height, err)
×
620
                }
×
621

622
                // Now, check that the entry was inserted correctly.
623
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
624
                        ctx, int64(height),
×
625
                )
×
626
                if err != nil {
×
627
                        return fmt.Errorf("could not get prune hash "+
×
628
                                "for height %d: %w", height, err)
×
629
                }
×
630

631
                return sqldb.CompareRecords(
×
632
                        hash[:], migratedHash, "prune log entry",
×
633
                )
×
634
        }
635

636
        // Iterate over each prune log entry in the KV store and migrate it to
637
        // the SQL database.
638
        err := forEachPruneLogEntry(
×
639
                kvBackend, func(height uint32, hash *chainhash.Hash) error {
×
640
                        err := migrateSinglePruneEntry(height, hash)
×
641
                        if err != nil {
×
642
                                return fmt.Errorf("could not migrate "+
×
643
                                        "prune log entry at height %d: %w",
×
644
                                        height, err)
×
645
                        }
×
646

647
                        s.Do(func() {
×
648
                                elapsed := time.Since(t0).Seconds()
×
649
                                ratePerSec := float64(chunk) / elapsed
×
650
                                log.Debugf("Migrated %d prune log "+
×
651
                                        "entries (%.2f entries/sec)",
×
652
                                        count, ratePerSec)
×
653

×
654
                                t0 = time.Now()
×
655
                                chunk = 0
×
656
                        })
×
657

658
                        return nil
×
659
                },
660
        )
661
        if err != nil {
×
662
                return fmt.Errorf("could not migrate prune log: %w", err)
×
663
        }
×
664

665
        // Check that the prune tip is set correctly in the SQL
666
        // database.
667
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
668
        if errors.Is(err, sql.ErrNoRows) {
×
669
                // The ErrGraphNeverPruned error is expected if no prune log
×
670
                // entries were migrated from the kvdb store. Otherwise, it's
×
671
                // an unexpected error.
×
672
                if count == 0 {
×
673
                        log.Infof("No prune log entries found in KV store " +
×
674
                                "to migrate")
×
675
                        return nil
×
676
                }
×
677
                // Fall-through to the next error check.
678
        }
679
        if err != nil {
×
680
                return fmt.Errorf("could not get prune tip: %w", err)
×
681
        }
×
682

683
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
684
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
685

×
686
                return fmt.Errorf("prune tip mismatch after migration: "+
×
687
                        "expected height %d, hash %s; got height %d, "+
×
688
                        "hash %s", pruneTipHeight, pruneTipHash,
×
689
                        pruneTip.BlockHeight,
×
690
                        chainhash.Hash(pruneTip.BlockHash))
×
691
        }
×
692

693
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
694
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
695
                pruneTipHash)
×
696

×
697
        return nil
×
698
}
699

700
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
701
// and policies from the given row returned by the SQL query
702
// GetChannelBySCIDWithPolicies.
703
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
704
        row sqlc.GetChannelBySCIDWithPoliciesRow,
705
        chain chainhash.Hash) (*models.ChannelEdgeInfo,
706
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
707

×
708
        node1, node2, err := buildNodeVertices(
×
709
                row.GraphNode.PubKey, row.GraphNode_2.PubKey,
×
710
        )
×
711
        if err != nil {
×
712
                return nil, nil, nil, err
×
713
        }
×
714

715
        edge, err := getAndBuildEdgeInfo(
×
NEW
716
                ctx, db, chain, row.GraphChannel, node1, node2,
×
717
        )
×
718
        if err != nil {
×
719
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
720
                        "info: %w", err)
×
721
        }
×
722

723
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
724
        if err != nil {
×
725
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
726
                        "policies: %w", err)
×
727
        }
×
728

729
        policy1, policy2, err := getAndBuildChanPolicies(
×
730
                ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
×
731
        )
×
732
        if err != nil {
×
733
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
734
                        "policies: %w", err)
×
735
        }
×
736

737
        return edge, policy1, policy2, nil
×
738
}
739

740
// forEachPruneLogEntry iterates over each prune log entry in the KV
741
// backend and calls the provided callback function for each entry.
742
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
743
        hash *chainhash.Hash) error) error {
×
744

×
745
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
746
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
747
                if metaBucket == nil {
×
748
                        return ErrGraphNotFound
×
749
                }
×
750

751
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
752
                if pruneBucket == nil {
×
753
                        // The graph has never been pruned and so, there are no
×
754
                        // entries to iterate over.
×
755
                        return nil
×
756
                }
×
757

758
                return pruneBucket.ForEach(func(k, v []byte) error {
×
759
                        blockHeight := byteOrder.Uint32(k)
×
760
                        var blockHash chainhash.Hash
×
761
                        copy(blockHash[:], v)
×
762

×
763
                        return cb(blockHeight, &blockHash)
×
764
                })
×
765
        }, func() {})
×
766
}
767

768
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
769
// the SQL database. It iterates over each closed SCID in the KV store, inserts
770
// it into the SQL database, and then verifies that the SCID was inserted
771
// correctly by checking if the channel with the given SCID is seen as closed in
772
// the SQL database.
773
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
774
        sqlDB SQLQueries) error {
×
775

×
776
        var (
×
777
                count uint64
×
778

×
779
                t0    = time.Now()
×
780
                chunk uint64
×
781
                s     = rate.Sometimes{
×
782
                        Interval: 10 * time.Second,
×
783
                }
×
784
        )
×
785
        migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
×
786
                count++
×
787

×
788
                chanIDB := channelIDToBytes(scid.ToUint64())
×
789
                err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
790
                if err != nil {
×
791
                        return fmt.Errorf("could not insert closed channel "+
×
792
                                "with SCID %s: %w", scid, err)
×
793
                }
×
794

795
                // Now, verify that the channel with the given SCID is
796
                // seen as closed.
797
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
798
                if err != nil {
×
799
                        return fmt.Errorf("could not check if channel %s "+
×
800
                                "is closed: %w", scid, err)
×
801
                }
×
802

803
                if !isClosed {
×
804
                        return fmt.Errorf("channel %s should be closed, "+
×
805
                                "but is not", scid)
×
806
                }
×
807

808
                s.Do(func() {
×
809
                        elapsed := time.Since(t0).Seconds()
×
810
                        ratePerSec := float64(chunk) / elapsed
×
811
                        log.Debugf("Migrated %d closed scids "+
×
812
                                "(%.2f entries/sec)", count, ratePerSec)
×
813

×
814
                        t0 = time.Now()
×
815
                        chunk = 0
×
816
                })
×
817

818
                return nil
×
819
        }
820

821
        err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID)
×
822
        if err != nil {
×
823
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
824
                        err)
×
825
        }
×
826

827
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
828

×
829
        return nil
×
830
}
831

832
// migrateZombieIndex migrates the zombie index from the KV backend to
833
// the SQL database. It iterates over each zombie channel in the KV store,
834
// inserts it into the SQL database, and then verifies that the channel is
835
// indeed marked as a zombie channel in the SQL database.
836
//
837
// NOTE: before inserting an entry into the zombie index, the function checks
838
// if the channel is already marked as closed in the SQL store. If it is,
839
// the entry is skipped. This means that the resulting zombie index count in
840
// the SQL store may well be less than the count of zombie channels in the KV
841
// store.
842
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
843
        sqlDB SQLQueries) error {
×
844

×
845
        var (
×
846
                count uint64
×
847

×
848
                t0    = time.Now()
×
849
                chunk uint64
×
850
                s     = rate.Sometimes{
×
851
                        Interval: 10 * time.Second,
×
852
                }
×
853
        )
×
854
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
855
                pubKey2 [33]byte) error {
×
856

×
857
                chanIDB := channelIDToBytes(chanID)
×
858

×
859
                // If it is in the closed SCID index, we don't need to
×
860
                // add it to the zombie index.
×
861
                //
×
862
                // NOTE: this means that the resulting zombie index count in
×
863
                // the SQL store may well be less than the count of zombie
×
864
                // channels in the KV store.
×
865
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
866
                if err != nil {
×
867
                        return fmt.Errorf("could not check closed "+
×
868
                                "channel: %w", err)
×
869
                }
×
870
                if isClosed {
×
871
                        return nil
×
872
                }
×
873

874
                count++
×
875

×
876
                err = sqlDB.UpsertZombieChannel(
×
877
                        ctx, sqlc.UpsertZombieChannelParams{
×
878
                                Version:  int16(ProtocolV1),
×
879
                                Scid:     chanIDB,
×
880
                                NodeKey1: pubKey1[:],
×
881
                                NodeKey2: pubKey2[:],
×
882
                        },
×
883
                )
×
884
                if err != nil {
×
885
                        return fmt.Errorf("could not upsert zombie "+
×
886
                                "channel %d: %w", chanID, err)
×
887
                }
×
888

889
                // Finally, verify that the channel is indeed marked as a
890
                // zombie channel.
891
                isZombie, err := sqlDB.IsZombieChannel(
×
892
                        ctx, sqlc.IsZombieChannelParams{
×
893
                                Version: int16(ProtocolV1),
×
894
                                Scid:    chanIDB,
×
895
                        },
×
896
                )
×
897
                if err != nil {
×
898
                        return fmt.Errorf("could not check if "+
×
899
                                "channel %d is zombie: %w", chanID, err)
×
900
                }
×
901

902
                if !isZombie {
×
903
                        return fmt.Errorf("channel %d should be "+
×
904
                                "a zombie, but is not", chanID)
×
905
                }
×
906

907
                s.Do(func() {
×
908
                        elapsed := time.Since(t0).Seconds()
×
909
                        ratePerSec := float64(chunk) / elapsed
×
910
                        log.Debugf("Migrated %d zombie index entries "+
×
911
                                "(%.2f entries/sec)", count, ratePerSec)
×
912

×
913
                        t0 = time.Now()
×
914
                        chunk = 0
×
915
                })
×
916

917
                return nil
×
918
        })
919
        if err != nil {
×
920
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
921
        }
×
922

923
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
924

×
925
        return nil
×
926
}
927

928
// forEachZombieEntry iterates over each zombie channel entry in the
929
// KV backend and calls the provided callback function for each entry.
930
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
931
        pubKey2 [33]byte) error) error {
×
932

×
933
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
934
                edges := tx.ReadBucket(edgeBucket)
×
935
                if edges == nil {
×
936
                        return ErrGraphNoEdgesFound
×
937
                }
×
938
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
939
                if zombieIndex == nil {
×
940
                        return nil
×
941
                }
×
942

943
                return zombieIndex.ForEach(func(k, v []byte) error {
×
944
                        var pubKey1, pubKey2 [33]byte
×
945
                        copy(pubKey1[:], v[:33])
×
946
                        copy(pubKey2[:], v[33:])
×
947

×
948
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
949
                })
×
950
        }, func() {})
×
951
}
952

953
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
954
// the provided callback function for each SCID.
955
func forEachClosedSCID(db kvdb.Backend,
956
        cb func(lnwire.ShortChannelID) error) error {
×
957

×
958
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
959
                closedScids := tx.ReadBucket(closedScidBucket)
×
960
                if closedScids == nil {
×
961
                        return nil
×
962
                }
×
963

964
                return closedScids.ForEach(func(k, _ []byte) error {
×
965
                        return cb(lnwire.NewShortChanIDFromInt(
×
966
                                byteOrder.Uint64(k),
×
967
                        ))
×
968
                })
×
969
        }, func() {})
×
970
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc