• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16796616815

07 Aug 2025 06:12AM UTC coverage: 66.9% (+0.03%) from 66.868%
16796616815

Pull #10129

github

web-flow
Merge 522e200c0 into e5359f2f5
Pull Request #10129: [8] graph/db: use batch loading for various graph SQL methods

6 of 332 new or added lines in 4 files covered. (1.81%)

118 existing lines in 23 files now uncovered.

135673 of 202800 relevant lines covered (66.9%)

21550.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "cmp"
6
        "context"
7
        "database/sql"
8
        "errors"
9
        "fmt"
10
        "net"
11
        "slices"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb"
19
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
20
        "golang.org/x/time/rate"
21
)
22

23
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
24
// backend.
25
//
26
// NOTE: this is currently not called from any code path. It is called via tests
27
// only for now and will be called from the main lnd binary once the
28
// migration is fully implemented and tested.
29
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
30
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
31

×
32
        log.Infof("Starting migration of the graph store from KV to SQL")
×
33
        t0 := time.Now()
×
34

×
35
        // Check if there is a graph to migrate.
×
36
        graphExists, err := checkGraphExists(kvBackend)
×
37
        if err != nil {
×
38
                return fmt.Errorf("failed to check graph existence: %w", err)
×
39
        }
×
40
        if !graphExists {
×
41
                log.Infof("No graph found in KV store, skipping the migration")
×
42
                return nil
×
43
        }
×
44

45
        // 1) Migrate all the nodes.
46
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
47
                return fmt.Errorf("could not migrate nodes: %w", err)
×
48
        }
×
49

50
        // 2) Migrate the source node.
51
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
52
                return fmt.Errorf("could not migrate source node: %w", err)
×
53
        }
×
54

55
        // 3) Migrate all the channels and channel policies.
56
        err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
×
57
        if err != nil {
×
58
                return fmt.Errorf("could not migrate channels and policies: %w",
×
59
                        err)
×
60
        }
×
61

62
        // 4) Migrate the Prune log.
63
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
64
                return fmt.Errorf("could not migrate prune log: %w", err)
×
65
        }
×
66

67
        // 5) Migrate the closed SCID index.
68
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
69
        if err != nil {
×
70
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
71
                        err)
×
72
        }
×
73

74
        // 6) Migrate the zombie index.
75
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
76
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
77
        }
×
78

79
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
80
                time.Since(t0))
×
81

×
82
        return nil
×
83
}
84

85
// checkGraphExists checks if the graph exists in the KV backend.
86
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
87
        // Check if there is even a graph to migrate.
×
88
        err := db.View(func(tx kvdb.RTx) error {
×
89
                // Check for the existence of the node bucket which is a top
×
90
                // level bucket that would have been created on the initial
×
91
                // creation of the graph store.
×
92
                nodes := tx.ReadBucket(nodeBucket)
×
93
                if nodes == nil {
×
94
                        return ErrGraphNotFound
×
95
                }
×
96

97
                return nil
×
98
        }, func() {})
×
99
        if errors.Is(err, ErrGraphNotFound) {
×
100
                return false, nil
×
101
        } else if err != nil {
×
102
                return false, err
×
103
        }
×
104

105
        return true, nil
×
106
}
107

108
// migrateNodes migrates all nodes from the KV backend to the SQL database.
109
// This includes doing a sanity check after each migration to ensure that the
110
// migrated node matches the original node.
111
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
112
        sqlDB SQLQueries) error {
×
113

×
114
        // Keep track of the number of nodes migrated and the number of
×
115
        // nodes skipped due to errors.
×
116
        var (
×
117
                count   uint64
×
118
                skipped uint64
×
119

×
120
                t0    = time.Now()
×
121
                chunk uint64
×
122
                s     = rate.Sometimes{
×
123
                        Interval: 10 * time.Second,
×
124
                }
×
125
        )
×
126

×
127
        // Loop through each node in the KV store and insert it into the SQL
×
128
        // database.
×
129
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
130
                node *models.LightningNode) error {
×
131

×
132
                pub := node.PubKeyBytes
×
133

×
134
                // Sanity check to ensure that the node has valid extra opaque
×
135
                // data. If it does not, we'll skip it. We need to do this
×
136
                // because previously we would just persist any TLV bytes that
×
137
                // we received without validating them. Now, however, we
×
138
                // normalise the storage of extra opaque data, so we need to
×
139
                // ensure that the data is valid. We don't want to abort the
×
140
                // migration if we encounter a node with invalid extra opaque
×
141
                // data, so we'll just skip it and log a warning.
×
142
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
143
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
144
                        skipped++
×
145
                        log.Warnf("Skipping migration of node %x with invalid "+
×
146
                                "extra opaque data: %v", pub,
×
147
                                node.ExtraOpaqueData)
×
148

×
149
                        return nil
×
150
                } else if err != nil {
×
151
                        return fmt.Errorf("unable to marshal extra "+
×
152
                                "opaque data for node %x: %w", pub, err)
×
153
                }
×
154

155
                count++
×
156
                chunk++
×
157

×
158
                // TODO(elle): At this point, we should check the loaded node
×
159
                // to see if we should extract any DNS addresses from its
×
160
                // opaque type addresses. This is expected to be done in:
×
161
                // https://github.com/lightningnetwork/lnd/pull/9455.
×
162
                // This TODO is being tracked in
×
163
                //  https://github.com/lightningnetwork/lnd/issues/9795 as this
×
164
                // must be addressed before making this code path active in
×
165
                // production.
×
166

×
167
                // Write the node to the SQL database.
×
168
                id, err := upsertNode(ctx, sqlDB, node)
×
169
                if err != nil {
×
170
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
171
                                err)
×
172
                }
×
173

174
                // Fetch it from the SQL store and compare it against the
175
                // original node object to ensure the migration was successful.
176
                dbNode, err := sqlDB.GetNodeByPubKey(
×
177
                        ctx, sqlc.GetNodeByPubKeyParams{
×
178
                                PubKey:  node.PubKeyBytes[:],
×
179
                                Version: int16(ProtocolV1),
×
180
                        },
×
181
                )
×
182
                if err != nil {
×
183
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
184
                                "after migration: %w", pub, err)
×
185
                }
×
186

187
                // Sanity check: ensure the migrated node ID matches the one we
188
                // just inserted.
189
                if dbNode.ID != id {
×
190
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
191
                                "after migration: expected %d, got %d",
×
192
                                pub, id, dbNode.ID)
×
193
                }
×
194

NEW
195
                migratedNode, err := buildNode(ctx, sqlDB, dbNode)
×
196
                if err != nil {
×
197
                        return fmt.Errorf("could not build migrated node "+
×
198
                                "from dbNode(db id: %d, node pub: %x): %w",
×
199
                                dbNode.ID, pub, err)
×
200
                }
×
201

202
                // Make sure that the node addresses are sorted before
203
                // comparing them to ensure that the order of addresses does
204
                // not affect the comparison.
205
                slices.SortFunc(node.Addresses, func(i, j net.Addr) int {
×
206
                        return cmp.Compare(i.String(), j.String())
×
207
                })
×
208
                slices.SortFunc(
×
209
                        migratedNode.Addresses, func(i, j net.Addr) int {
×
210
                                return cmp.Compare(i.String(), j.String())
×
211
                        },
×
212
                )
213

214
                err = sqldb.CompareRecords(
×
215
                        node, migratedNode, fmt.Sprintf("node %x", pub),
×
216
                )
×
217
                if err != nil {
×
218
                        return fmt.Errorf("node mismatch after migration "+
×
219
                                "for node %x: %w", pub, err)
×
220
                }
×
221

222
                s.Do(func() {
×
223
                        elapsed := time.Since(t0).Seconds()
×
224
                        ratePerSec := float64(chunk) / elapsed
×
225
                        log.Debugf("Migrated %d nodes (%.2f nodes/sec)",
×
226
                                count, ratePerSec)
×
227

×
228
                        t0 = time.Now()
×
229
                        chunk = 0
×
230
                })
×
231

232
                return nil
×
233
        }, func() {
×
234
                // No reset is needed since if a retry occurs, the entire
×
235
                // migration will be retried from the start.
×
236
        })
×
237
        if err != nil {
×
238
                return fmt.Errorf("could not migrate nodes: %w", err)
×
239
        }
×
240

241
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
242
                "invalid TLV streams)", count, skipped)
×
243

×
244
        return nil
×
245
}
246

247
// migrateSourceNode migrates the source node from the KV backend to the
248
// SQL database.
249
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
250
        sqlDB SQLQueries) error {
×
251

×
252
        log.Debugf("Migrating source node from KV to SQL")
×
253

×
254
        sourceNode, err := sourceNode(kvdb)
×
255
        if errors.Is(err, ErrSourceNodeNotSet) {
×
256
                // If the source node has not been set yet, we can skip this
×
257
                // migration step.
×
258
                return nil
×
259
        } else if err != nil {
×
260
                return fmt.Errorf("could not get source node from kv "+
×
261
                        "store: %w", err)
×
262
        }
×
263

264
        pub := sourceNode.PubKeyBytes
×
265

×
266
        // Get the DB ID of the source node by its public key. This node must
×
267
        // already exist in the SQL database, as it should have been migrated
×
268
        // in the previous node-migration step.
×
269
        id, err := sqlDB.GetNodeIDByPubKey(
×
270
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
271
                        PubKey:  pub[:],
×
272
                        Version: int16(ProtocolV1),
×
273
                },
×
274
        )
×
275
        if err != nil {
×
276
                return fmt.Errorf("could not get source node ID: %w", err)
×
277
        }
×
278

279
        // Now we can add the source node to the SQL database.
280
        err = sqlDB.AddSourceNode(ctx, id)
×
281
        if err != nil {
×
282
                return fmt.Errorf("could not add source node to SQL store: %w",
×
283
                        err)
×
284
        }
×
285

286
        // Verify that the source node was added correctly by fetching it back
287
        // from the SQL database and checking that the expected DB ID and
288
        // pub key are returned. We don't need to do a whole node comparison
289
        // here, as this was already done in the previous migration step.
290
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
291
        if err != nil {
×
292
                return fmt.Errorf("could not get source nodes from SQL "+
×
293
                        "store: %w", err)
×
294
        }
×
295

296
        // The SQL store has support for multiple source nodes (for future
297
        // protocol versions) but this migration is purely aimed at the V1
298
        // store, and so we expect exactly one source node to be present.
299
        if len(srcNodes) != 1 {
×
300
                return fmt.Errorf("expected exactly one source node, "+
×
301
                        "got %d", len(srcNodes))
×
302
        }
×
303

304
        // Check that the source node ID and pub key match the original
305
        // source node.
306
        if srcNodes[0].NodeID != id {
×
307
                return fmt.Errorf("source node ID mismatch after migration: "+
×
308
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
309
        }
×
310
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
311
        if err != nil {
×
312
                return fmt.Errorf("source node pubkey mismatch after "+
×
313
                        "migration: %w", err)
×
314
        }
×
315

316
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
317

×
318
        return nil
×
319
}
320

321
// migrateChannelsAndPolicies migrates all channels and their policies
322
// from the KV backend to the SQL database.
323
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
324
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
325

×
326
        var (
×
327
                channelCount       uint64
×
328
                skippedChanCount   uint64
×
329
                policyCount        uint64
×
330
                skippedPolicyCount uint64
×
331

×
332
                t0    = time.Now()
×
333
                chunk uint64
×
334
                s     = rate.Sometimes{
×
335
                        Interval: 10 * time.Second,
×
336
                }
×
337
        )
×
338
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
339
                // If the policy is nil, we can skip it.
×
340
                if policy == nil {
×
341
                        return nil
×
342
                }
×
343

344
                // Unlike the special case of invalid TLV bytes for node and
345
                // channel announcements, we don't need to handle the case for
346
                // channel policies here because it is already handled in the
347
                // `forEachChannel` function. If the policy has invalid TLV
348
                // bytes, then `nil` will be passed to this function.
349

350
                policyCount++
×
351

×
352
                _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
×
353
                if err != nil {
×
354
                        return fmt.Errorf("could not migrate channel "+
×
355
                                "policy %d: %w", policy.ChannelID, err)
×
356
                }
×
357

358
                return nil
×
359
        }
360

361
        // Iterate over each channel in the KV store and migrate it and its
362
        // policies to the SQL database.
363
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
364
                policy1 *models.ChannelEdgePolicy,
×
365
                policy2 *models.ChannelEdgePolicy) error {
×
366

×
367
                scid := channel.ChannelID
×
368

×
369
                // Here, we do a sanity check to ensure that the chain hash of
×
370
                // the channel returned by the KV store matches the expected
×
371
                // chain hash. This is important since in the SQL store, we will
×
372
                // no longer explicitly store the chain hash in the channel
×
373
                // info, but rather rely on the chain hash LND is running with.
×
374
                // So this is our way of ensuring that LND is running on the
×
375
                // correct network at migration time.
×
376
                if channel.ChainHash != chain {
×
377
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
378
                                "expected %s", scid, channel.ChainHash, chain)
×
379
                }
×
380

381
                // Sanity check to ensure that the channel has valid extra
382
                // opaque data. If it does not, we'll skip it. We need to do
383
                // this because previously we would just persist any TLV bytes
384
                // that we received without validating them. Now, however, we
385
                // normalise the storage of extra opaque data, so we need to
386
                // ensure that the data is valid. We don't want to abort the
387
                // migration if we encounter a channel with invalid extra opaque
388
                // data, so we'll just skip it and log a warning.
389
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
390
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
391
                        log.Warnf("Skipping channel %d with invalid "+
×
392
                                "extra opaque data: %v", scid,
×
393
                                channel.ExtraOpaqueData)
×
394

×
395
                        skippedChanCount++
×
396

×
397
                        // If we skip a channel, we also skip its policies.
×
398
                        if policy1 != nil {
×
399
                                skippedPolicyCount++
×
400
                        }
×
401
                        if policy2 != nil {
×
402
                                skippedPolicyCount++
×
403
                        }
×
404

405
                        return nil
×
406
                } else if err != nil {
×
407
                        return fmt.Errorf("unable to marshal extra opaque "+
×
408
                                "data for channel %d (%v): %w", scid,
×
409
                                channel.ExtraOpaqueData, err)
×
410
                }
×
411

412
                channelCount++
×
NEW
413
                chunk++
×
NEW
414

×
415
                err = migrateSingleChannel(
×
416
                        ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
×
417
                )
×
418
                if err != nil {
×
419
                        return fmt.Errorf("could not migrate channel %d: %w",
×
420
                                scid, err)
×
421
                }
×
422

423
                s.Do(func() {
×
424
                        elapsed := time.Since(t0).Seconds()
×
425
                        ratePerSec := float64(chunk) / elapsed
×
426
                        log.Debugf("Migrated %d channels (%.2f channels/sec)",
×
427
                                channelCount, ratePerSec)
×
428

×
429
                        t0 = time.Now()
×
430
                        chunk = 0
×
431
                })
×
432

433
                return nil
×
434
        }, func() {
×
435
                // No reset is needed since if a retry occurs, the entire
×
436
                // migration will be retried from the start.
×
437
        })
×
438
        if err != nil {
×
439
                return fmt.Errorf("could not migrate channels and policies: %w",
×
440
                        err)
×
441
        }
×
442

443
        log.Infof("Migrated %d channels and %d policies from KV to SQL "+
×
444
                "(skipped %d channels and %d policies due to invalid TLV "+
×
445
                "streams)", channelCount, policyCount, skippedChanCount,
×
446
                skippedPolicyCount)
×
447

×
448
        return nil
×
449
}
450

451
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
452
        channel *models.ChannelEdgeInfo,
453
        policy1, policy2 *models.ChannelEdgePolicy,
454
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
455

×
456
        scid := channel.ChannelID
×
457

×
458
        // First, migrate the channel info along with its policies.
×
459
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
460
        if err != nil {
×
461
                return fmt.Errorf("could not insert record for channel %d "+
×
462
                        "in SQL store: %w", scid, err)
×
463
        }
×
464

465
        // Now, migrate the two channel policies.
466
        err = migChanPolicy(policy1)
×
467
        if err != nil {
×
468
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
469
                        err)
×
470
        }
×
471
        err = migChanPolicy(policy2)
×
472
        if err != nil {
×
473
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
474
                        err)
×
475
        }
×
476

477
        // Now, fetch the channel and its policies from the SQL DB.
478
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
479
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
480
                        Scid:    channelIDToBytes(scid),
×
481
                        Version: int16(ProtocolV1),
×
482
                },
×
483
        )
×
484
        if err != nil {
×
485
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
486
                        err)
×
487
        }
×
488

489
        // Assert that the DB IDs for the channel and nodes are as expected
490
        // given the inserted channel info.
491
        err = sqldb.CompareRecords(
×
492
                dbChanInfo.channelID, row.GraphChannel.ID, "channel DB ID",
×
493
        )
×
494
        if err != nil {
×
495
                return err
×
496
        }
×
497
        err = sqldb.CompareRecords(
×
498
                dbChanInfo.node1ID, row.GraphNode.ID, "node1 DB ID",
×
499
        )
×
500
        if err != nil {
×
501
                return err
×
502
        }
×
503
        err = sqldb.CompareRecords(
×
504
                dbChanInfo.node2ID, row.GraphNode_2.ID, "node2 DB ID",
×
505
        )
×
506
        if err != nil {
×
507
                return err
×
508
        }
×
509

510
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
511
                ctx, sqlDB, row, channel.ChainHash,
×
512
        )
×
513
        if err != nil {
×
514
                return fmt.Errorf("could not build migrated channel and "+
×
515
                        "policies: %w", err)
×
516
        }
×
517

518
        // Finally, compare the original channel info and
519
        // policies with the migrated ones to ensure they match.
520
        if len(channel.ExtraOpaqueData) == 0 {
×
521
                channel.ExtraOpaqueData = nil
×
522
        }
×
523
        if len(migChan.ExtraOpaqueData) == 0 {
×
524
                migChan.ExtraOpaqueData = nil
×
525
        }
×
526

527
        err = sqldb.CompareRecords(
×
528
                channel, migChan, fmt.Sprintf("channel %d", scid),
×
529
        )
×
530
        if err != nil {
×
531
                return err
×
532
        }
×
533

534
        checkPolicy := func(expPolicy,
×
535
                migPolicy *models.ChannelEdgePolicy) error {
×
536

×
537
                switch {
×
538
                // Both policies are nil, nothing to compare.
539
                case expPolicy == nil && migPolicy == nil:
×
540
                        return nil
×
541

542
                // One of the policies is nil, but the other is not.
543
                case expPolicy == nil || migPolicy == nil:
×
544
                        return fmt.Errorf("expected both policies to be "+
×
545
                                "non-nil. Got expPolicy: %v, "+
×
546
                                "migPolicy: %v", expPolicy, migPolicy)
×
547

548
                // Both policies are non-nil, we can compare them.
549
                default:
×
550
                }
551

552
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
553
                        expPolicy.ExtraOpaqueData = nil
×
554
                }
×
555
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
556
                        migPolicy.ExtraOpaqueData = nil
×
557
                }
×
558

559
                return sqldb.CompareRecords(
×
560
                        *expPolicy, *migPolicy, "channel policy",
×
561
                )
×
562
        }
563

564
        err = checkPolicy(policy1, migPol1)
×
565
        if err != nil {
×
566
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
567
                        err)
×
568
        }
×
569

570
        err = checkPolicy(policy2, migPol2)
×
571
        if err != nil {
×
572
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
573
                        err)
×
574
        }
×
575

576
        return nil
×
577
}
578

579
// migratePruneLog migrates the prune log from the KV backend to the SQL
580
// database. It iterates over each prune log entry in the KV store, inserts it
581
// into the SQL database, and then verifies that the entry was inserted
582
// correctly by fetching it back from the SQL database and comparing it to the
583
// original entry.
584
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
585
        sqlDB SQLQueries) error {
×
586

×
587
        var (
×
588
                count          uint64
×
589
                pruneTipHeight uint32
×
590
                pruneTipHash   chainhash.Hash
×
591

×
592
                t0    = time.Now()
×
593
                chunk uint64
×
594
                s     = rate.Sometimes{
×
595
                        Interval: 10 * time.Second,
×
596
                }
×
597
        )
×
598

×
599
        // migrateSinglePruneEntry is a helper function that inserts a single
×
600
        // prune log entry into the SQL database and verifies that it was
×
601
        // inserted correctly.
×
602
        migrateSinglePruneEntry := func(height uint32,
×
603
                hash *chainhash.Hash) error {
×
604

×
605
                count++
×
606

×
607
                // Keep track of the prune tip height and hash.
×
608
                if height > pruneTipHeight {
×
609
                        pruneTipHeight = height
×
610
                        pruneTipHash = *hash
×
611
                }
×
612

613
                err := sqlDB.UpsertPruneLogEntry(
×
614
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
615
                                BlockHeight: int64(height),
×
616
                                BlockHash:   hash[:],
×
617
                        },
×
618
                )
×
619
                if err != nil {
×
620
                        return fmt.Errorf("unable to insert prune log "+
×
621
                                "entry for height %d: %w", height, err)
×
622
                }
×
623

624
                // Now, check that the entry was inserted correctly.
625
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
626
                        ctx, int64(height),
×
627
                )
×
628
                if err != nil {
×
629
                        return fmt.Errorf("could not get prune hash "+
×
630
                                "for height %d: %w", height, err)
×
631
                }
×
632

633
                return sqldb.CompareRecords(
×
634
                        hash[:], migratedHash, "prune log entry",
×
635
                )
×
636
        }
637

638
        // Iterate over each prune log entry in the KV store and migrate it to
639
        // the SQL database.
640
        err := forEachPruneLogEntry(
×
641
                kvBackend, func(height uint32, hash *chainhash.Hash) error {
×
642
                        err := migrateSinglePruneEntry(height, hash)
×
643
                        if err != nil {
×
644
                                return fmt.Errorf("could not migrate "+
×
645
                                        "prune log entry at height %d: %w",
×
646
                                        height, err)
×
647
                        }
×
648

649
                        s.Do(func() {
×
650
                                elapsed := time.Since(t0).Seconds()
×
651
                                ratePerSec := float64(chunk) / elapsed
×
652
                                log.Debugf("Migrated %d prune log "+
×
653
                                        "entries (%.2f entries/sec)",
×
654
                                        count, ratePerSec)
×
655

×
656
                                t0 = time.Now()
×
657
                                chunk = 0
×
658
                        })
×
659

660
                        return nil
×
661
                },
662
        )
663
        if err != nil {
×
664
                return fmt.Errorf("could not migrate prune log: %w", err)
×
665
        }
×
666

667
        // Check that the prune tip is set correctly in the SQL
668
        // database.
669
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
670
        if errors.Is(err, sql.ErrNoRows) {
×
671
                // The ErrGraphNeverPruned error is expected if no prune log
×
672
                // entries were migrated from the kvdb store. Otherwise, it's
×
673
                // an unexpected error.
×
674
                if count == 0 {
×
675
                        log.Infof("No prune log entries found in KV store " +
×
676
                                "to migrate")
×
677
                        return nil
×
678
                }
×
679
                // Fall-through to the next error check.
680
        }
681
        if err != nil {
×
682
                return fmt.Errorf("could not get prune tip: %w", err)
×
683
        }
×
684

685
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
686
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
687

×
688
                return fmt.Errorf("prune tip mismatch after migration: "+
×
689
                        "expected height %d, hash %s; got height %d, "+
×
690
                        "hash %s", pruneTipHeight, pruneTipHash,
×
691
                        pruneTip.BlockHeight,
×
692
                        chainhash.Hash(pruneTip.BlockHash))
×
693
        }
×
694

695
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
696
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
697
                pruneTipHash)
×
698

×
699
        return nil
×
700
}
701

702
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
703
// and policies from the given row returned by the SQL query
704
// GetChannelBySCIDWithPolicies.
705
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
706
        row sqlc.GetChannelBySCIDWithPoliciesRow,
707
        chain chainhash.Hash) (*models.ChannelEdgeInfo,
708
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
709

×
710
        node1, node2, err := buildNodeVertices(
×
711
                row.GraphNode.PubKey, row.GraphNode_2.PubKey,
×
712
        )
×
713
        if err != nil {
×
714
                return nil, nil, nil, err
×
715
        }
×
716

717
        edge, err := getAndBuildEdgeInfo(
×
718
                ctx, db, chain, row.GraphChannel, node1, node2,
×
719
        )
×
720
        if err != nil {
×
721
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
722
                        "info: %w", err)
×
723
        }
×
724

725
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
726
        if err != nil {
×
727
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
728
                        "policies: %w", err)
×
729
        }
×
730

731
        policy1, policy2, err := getAndBuildChanPolicies(
×
732
                ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
×
733
        )
×
734
        if err != nil {
×
735
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
736
                        "policies: %w", err)
×
737
        }
×
738

739
        return edge, policy1, policy2, nil
×
740
}
741

742
// forEachPruneLogEntry iterates over each prune log entry in the KV
743
// backend and calls the provided callback function for each entry.
744
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
745
        hash *chainhash.Hash) error) error {
×
746

×
747
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
748
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
749
                if metaBucket == nil {
×
750
                        return ErrGraphNotFound
×
751
                }
×
752

753
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
754
                if pruneBucket == nil {
×
755
                        // The graph has never been pruned and so, there are no
×
756
                        // entries to iterate over.
×
757
                        return nil
×
758
                }
×
759

760
                return pruneBucket.ForEach(func(k, v []byte) error {
×
761
                        blockHeight := byteOrder.Uint32(k)
×
762
                        var blockHash chainhash.Hash
×
763
                        copy(blockHash[:], v)
×
764

×
765
                        return cb(blockHeight, &blockHash)
×
766
                })
×
767
        }, func() {})
×
768
}
769

770
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
771
// the SQL database. It iterates over each closed SCID in the KV store, inserts
772
// it into the SQL database, and then verifies that the SCID was inserted
773
// correctly by checking if the channel with the given SCID is seen as closed in
774
// the SQL database.
775
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
776
        sqlDB SQLQueries) error {
×
777

×
778
        var (
×
779
                count uint64
×
780

×
781
                t0    = time.Now()
×
782
                chunk uint64
×
783
                s     = rate.Sometimes{
×
784
                        Interval: 10 * time.Second,
×
785
                }
×
786
        )
×
787
        migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
×
788
                count++
×
789

×
790
                chanIDB := channelIDToBytes(scid.ToUint64())
×
791
                err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
792
                if err != nil {
×
793
                        return fmt.Errorf("could not insert closed channel "+
×
794
                                "with SCID %s: %w", scid, err)
×
795
                }
×
796

797
                // Now, verify that the channel with the given SCID is
798
                // seen as closed.
799
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
800
                if err != nil {
×
801
                        return fmt.Errorf("could not check if channel %s "+
×
802
                                "is closed: %w", scid, err)
×
803
                }
×
804

805
                if !isClosed {
×
806
                        return fmt.Errorf("channel %s should be closed, "+
×
807
                                "but is not", scid)
×
808
                }
×
809

810
                s.Do(func() {
×
811
                        elapsed := time.Since(t0).Seconds()
×
812
                        ratePerSec := float64(chunk) / elapsed
×
813
                        log.Debugf("Migrated %d closed scids "+
×
814
                                "(%.2f entries/sec)", count, ratePerSec)
×
815

×
816
                        t0 = time.Now()
×
817
                        chunk = 0
×
818
                })
×
819

820
                return nil
×
821
        }
822

823
        err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID)
×
824
        if err != nil {
×
825
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
826
                        err)
×
827
        }
×
828

829
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
830

×
831
        return nil
×
832
}
833

834
// migrateZombieIndex migrates the zombie index from the KV backend to
835
// the SQL database. It iterates over each zombie channel in the KV store,
836
// inserts it into the SQL database, and then verifies that the channel is
837
// indeed marked as a zombie channel in the SQL database.
838
//
839
// NOTE: before inserting an entry into the zombie index, the function checks
840
// if the channel is already marked as closed in the SQL store. If it is,
841
// the entry is skipped. This means that the resulting zombie index count in
842
// the SQL store may well be less than the count of zombie channels in the KV
843
// store.
844
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
845
        sqlDB SQLQueries) error {
×
846

×
847
        var (
×
848
                count uint64
×
849

×
850
                t0    = time.Now()
×
851
                chunk uint64
×
852
                s     = rate.Sometimes{
×
853
                        Interval: 10 * time.Second,
×
854
                }
×
855
        )
×
856
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
857
                pubKey2 [33]byte) error {
×
858

×
859
                chanIDB := channelIDToBytes(chanID)
×
860

×
861
                // If it is in the closed SCID index, we don't need to
×
862
                // add it to the zombie index.
×
863
                //
×
864
                // NOTE: this means that the resulting zombie index count in
×
865
                // the SQL store may well be less than the count of zombie
×
866
                // channels in the KV store.
×
867
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
868
                if err != nil {
×
869
                        return fmt.Errorf("could not check closed "+
×
870
                                "channel: %w", err)
×
871
                }
×
872
                if isClosed {
×
873
                        return nil
×
874
                }
×
875

876
                count++
×
877

×
878
                err = sqlDB.UpsertZombieChannel(
×
879
                        ctx, sqlc.UpsertZombieChannelParams{
×
880
                                Version:  int16(ProtocolV1),
×
881
                                Scid:     chanIDB,
×
882
                                NodeKey1: pubKey1[:],
×
883
                                NodeKey2: pubKey2[:],
×
884
                        },
×
885
                )
×
886
                if err != nil {
×
887
                        return fmt.Errorf("could not upsert zombie "+
×
888
                                "channel %d: %w", chanID, err)
×
889
                }
×
890

891
                // Finally, verify that the channel is indeed marked as a
892
                // zombie channel.
893
                isZombie, err := sqlDB.IsZombieChannel(
×
894
                        ctx, sqlc.IsZombieChannelParams{
×
895
                                Version: int16(ProtocolV1),
×
896
                                Scid:    chanIDB,
×
897
                        },
×
898
                )
×
899
                if err != nil {
×
900
                        return fmt.Errorf("could not check if "+
×
901
                                "channel %d is zombie: %w", chanID, err)
×
902
                }
×
903

904
                if !isZombie {
×
905
                        return fmt.Errorf("channel %d should be "+
×
906
                                "a zombie, but is not", chanID)
×
907
                }
×
908

909
                s.Do(func() {
×
910
                        elapsed := time.Since(t0).Seconds()
×
911
                        ratePerSec := float64(chunk) / elapsed
×
912
                        log.Debugf("Migrated %d zombie index entries "+
×
913
                                "(%.2f entries/sec)", count, ratePerSec)
×
914

×
915
                        t0 = time.Now()
×
916
                        chunk = 0
×
917
                })
×
918

919
                return nil
×
920
        })
921
        if err != nil {
×
922
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
923
        }
×
924

925
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
926

×
927
        return nil
×
928
}
929

930
// forEachZombieEntry iterates over each zombie channel entry in the
931
// KV backend and calls the provided callback function for each entry.
932
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
933
        pubKey2 [33]byte) error) error {
×
934

×
935
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
936
                edges := tx.ReadBucket(edgeBucket)
×
937
                if edges == nil {
×
938
                        return ErrGraphNoEdgesFound
×
939
                }
×
940
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
941
                if zombieIndex == nil {
×
942
                        return nil
×
943
                }
×
944

945
                return zombieIndex.ForEach(func(k, v []byte) error {
×
946
                        var pubKey1, pubKey2 [33]byte
×
947
                        copy(pubKey1[:], v[:33])
×
948
                        copy(pubKey2[:], v[33:])
×
949

×
950
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
951
                })
×
952
        }, func() {})
×
953
}
954

955
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
956
// the provided callback function for each SCID.
957
func forEachClosedSCID(db kvdb.Backend,
958
        cb func(lnwire.ShortChannelID) error) error {
×
959

×
960
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
961
                closedScids := tx.ReadBucket(closedScidBucket)
×
962
                if closedScids == nil {
×
963
                        return nil
×
964
                }
×
965

966
                return closedScids.ForEach(func(k, _ []byte) error {
×
967
                        return cb(lnwire.NewShortChanIDFromInt(
×
968
                                byteOrder.Uint64(k),
×
969
                        ))
×
970
                })
×
971
        }, func() {})
×
972
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc