• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16626557198

30 Jul 2025 03:14PM UTC coverage: 67.054% (-0.2%) from 67.216%
16626557198

push

github

web-flow
Merge pull request #9625 from MPins/issue-8161

Add deletecanceledinvoice RPC call

31 of 81 new or added lines in 4 files covered. (38.27%)

579 existing lines in 34 files now uncovered.

135260 of 201718 relevant lines covered (67.05%)

21683.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "cmp"
6
        "context"
7
        "database/sql"
8
        "errors"
9
        "fmt"
10
        "net"
11
        "slices"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb"
19
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
20
        "golang.org/x/time/rate"
21
)
22

23
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
24
// backend.
25
//
26
// NOTE: this is currently not called from any code path. It is called via tests
27
// only for now and will be called from the main lnd binary once the
28
// migration is fully implemented and tested.
29
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
30
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
31

×
32
        log.Infof("Starting migration of the graph store from KV to SQL")
×
33
        t0 := time.Now()
×
34

×
35
        // Check if there is a graph to migrate.
×
36
        graphExists, err := checkGraphExists(kvBackend)
×
37
        if err != nil {
×
38
                return fmt.Errorf("failed to check graph existence: %w", err)
×
39
        }
×
40
        if !graphExists {
×
41
                log.Infof("No graph found in KV store, skipping the migration")
×
42
                return nil
×
UNCOV
43
        }
×
44

45
        // 1) Migrate all the nodes.
46
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
47
                return fmt.Errorf("could not migrate nodes: %w", err)
×
UNCOV
48
        }
×
49

50
        // 2) Migrate the source node.
51
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
52
                return fmt.Errorf("could not migrate source node: %w", err)
×
UNCOV
53
        }
×
54

55
        // 3) Migrate all the channels and channel policies.
56
        err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
×
57
        if err != nil {
×
58
                return fmt.Errorf("could not migrate channels and policies: %w",
×
59
                        err)
×
UNCOV
60
        }
×
61

62
        // 4) Migrate the Prune log.
63
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
64
                return fmt.Errorf("could not migrate prune log: %w", err)
×
UNCOV
65
        }
×
66

67
        // 5) Migrate the closed SCID index.
68
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
69
        if err != nil {
×
70
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
71
                        err)
×
UNCOV
72
        }
×
73

74
        // 6) Migrate the zombie index.
75
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
76
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
UNCOV
77
        }
×
78

79
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
80
                time.Since(t0))
×
81

×
UNCOV
82
        return nil
×
83
}
84

85
// checkGraphExists checks if the graph exists in the KV backend.
86
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
87
        // Check if there is even a graph to migrate.
×
88
        err := db.View(func(tx kvdb.RTx) error {
×
89
                // Check for the existence of the node bucket which is a top
×
90
                // level bucket that would have been created on the initial
×
91
                // creation of the graph store.
×
92
                nodes := tx.ReadBucket(nodeBucket)
×
93
                if nodes == nil {
×
94
                        return ErrGraphNotFound
×
UNCOV
95
                }
×
96

97
                return nil
×
98
        }, func() {})
×
99
        if errors.Is(err, ErrGraphNotFound) {
×
100
                return false, nil
×
101
        } else if err != nil {
×
102
                return false, err
×
UNCOV
103
        }
×
104

UNCOV
105
        return true, nil
×
106
}
107

108
// migrateNodes migrates all nodes from the KV backend to the SQL database.
109
// This includes doing a sanity check after each migration to ensure that the
110
// migrated node matches the original node.
111
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
112
        sqlDB SQLQueries) error {
×
113

×
114
        // Keep track of the number of nodes migrated and the number of
×
115
        // nodes skipped due to errors.
×
116
        var (
×
117
                count   uint64
×
118
                skipped uint64
×
119

×
120
                t0    = time.Now()
×
121
                chunk uint64
×
122
                s     = rate.Sometimes{
×
123
                        Interval: 10 * time.Second,
×
124
                }
×
125
        )
×
126

×
127
        // Loop through each node in the KV store and insert it into the SQL
×
128
        // database.
×
129
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
130
                node *models.LightningNode) error {
×
131

×
132
                pub := node.PubKeyBytes
×
133

×
134
                // Sanity check to ensure that the node has valid extra opaque
×
135
                // data. If it does not, we'll skip it. We need to do this
×
136
                // because previously we would just persist any TLV bytes that
×
137
                // we received without validating them. Now, however, we
×
138
                // normalise the storage of extra opaque data, so we need to
×
139
                // ensure that the data is valid. We don't want to abort the
×
140
                // migration if we encounter a node with invalid extra opaque
×
141
                // data, so we'll just skip it and log a warning.
×
142
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
143
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
144
                        skipped++
×
145
                        log.Warnf("Skipping migration of node %x with invalid "+
×
146
                                "extra opaque data: %v", pub,
×
UNCOV
147
                                node.ExtraOpaqueData)
×
148

×
149
                        return nil
×
150
                } else if err != nil {
×
151
                        return fmt.Errorf("unable to marshal extra "+
×
152
                                "opaque data for node %x: %w", pub, err)
×
153
                }
×
154

155
                count++
×
156
                chunk++
×
157

×
158
                // TODO(elle): At this point, we should check the loaded node
×
159
                // to see if we should extract any DNS addresses from its
×
160
                // opaque type addresses. This is expected to be done in:
×
161
                // https://github.com/lightningnetwork/lnd/pull/9455.
×
162
                // This TODO is being tracked in
×
163
                //  https://github.com/lightningnetwork/lnd/issues/9795 as this
×
164
                // must be addressed before making this code path active in
×
UNCOV
165
                // production.
×
UNCOV
166

×
UNCOV
167
                // Write the node to the SQL database.
×
168
                id, err := upsertNode(ctx, sqlDB, node)
×
169
                if err != nil {
×
170
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
171
                                err)
×
172
                }
×
173

174
                // Fetch it from the SQL store and compare it against the
175
                // original node object to ensure the migration was successful.
176
                dbNode, err := sqlDB.GetNodeByPubKey(
×
177
                        ctx, sqlc.GetNodeByPubKeyParams{
×
UNCOV
178
                                PubKey:  node.PubKeyBytes[:],
×
UNCOV
179
                                Version: int16(ProtocolV1),
×
UNCOV
180
                        },
×
181
                )
×
182
                if err != nil {
×
183
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
184
                                "after migration: %w", pub, err)
×
185
                }
×
186

187
                // Sanity check: ensure the migrated node ID matches the one we
188
                // just inserted.
189
                if dbNode.ID != id {
×
190
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
191
                                "after migration: expected %d, got %d",
×
192
                                pub, id, dbNode.ID)
×
UNCOV
193
                }
×
194

UNCOV
195
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
UNCOV
196
                if err != nil {
×
197
                        return fmt.Errorf("could not build migrated node "+
×
198
                                "from dbNode(db id: %d, node pub: %x): %w",
×
199
                                dbNode.ID, pub, err)
×
200
                }
×
201

202
                // Make sure that the node addresses are sorted before
203
                // comparing them to ensure that the order of addresses does
204
                // not affect the comparison.
UNCOV
205
                slices.SortFunc(node.Addresses, func(i, j net.Addr) int {
×
206
                        return cmp.Compare(i.String(), j.String())
×
207
                })
×
208
                slices.SortFunc(
×
209
                        migratedNode.Addresses, func(i, j net.Addr) int {
×
210
                                return cmp.Compare(i.String(), j.String())
×
211
                        },
×
212
                )
213

214
                err = sqldb.CompareRecords(
×
215
                        node, migratedNode, fmt.Sprintf("node %x", pub),
×
UNCOV
216
                )
×
217
                if err != nil {
×
218
                        return fmt.Errorf("node mismatch after migration "+
×
219
                                "for node %x: %w", pub, err)
×
220
                }
×
221

UNCOV
222
                s.Do(func() {
×
UNCOV
223
                        elapsed := time.Since(t0).Seconds()
×
UNCOV
224
                        ratePerSec := float64(chunk) / elapsed
×
UNCOV
225
                        log.Debugf("Migrated %d nodes (%.2f nodes/sec)",
×
226
                                count, ratePerSec)
×
227

×
228
                        t0 = time.Now()
×
229
                        chunk = 0
×
230
                })
×
231

232
                return nil
×
233
        }, func() {
×
234
                // No reset is needed since if a retry occurs, the entire
×
235
                // migration will be retried from the start.
×
236
        })
×
UNCOV
237
        if err != nil {
×
238
                return fmt.Errorf("could not migrate nodes: %w", err)
×
239
        }
×
240

241
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
242
                "invalid TLV streams)", count, skipped)
×
243

×
244
        return nil
×
245
}
246

247
// migrateSourceNode migrates the source node from the KV backend to the
248
// SQL database.
249
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
250
        sqlDB SQLQueries) error {
×
251

×
UNCOV
252
        log.Debugf("Migrating source node from KV to SQL")
×
UNCOV
253

×
254
        sourceNode, err := sourceNode(kvdb)
×
255
        if errors.Is(err, ErrSourceNodeNotSet) {
×
256
                // If the source node has not been set yet, we can skip this
×
257
                // migration step.
×
258
                return nil
×
UNCOV
259
        } else if err != nil {
×
UNCOV
260
                return fmt.Errorf("could not get source node from kv "+
×
UNCOV
261
                        "store: %w", err)
×
UNCOV
262
        }
×
263

264
        pub := sourceNode.PubKeyBytes
×
265

×
266
        // Get the DB ID of the source node by its public key. This node must
×
267
        // already exist in the SQL database, as it should have been migrated
×
268
        // in the previous node-migration step.
×
UNCOV
269
        id, err := sqlDB.GetNodeIDByPubKey(
×
UNCOV
270
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
UNCOV
271
                        PubKey:  pub[:],
×
UNCOV
272
                        Version: int16(ProtocolV1),
×
273
                },
×
274
        )
×
275
        if err != nil {
×
276
                return fmt.Errorf("could not get source node ID: %w", err)
×
UNCOV
277
        }
×
278

279
        // Now we can add the source node to the SQL database.
280
        err = sqlDB.AddSourceNode(ctx, id)
×
281
        if err != nil {
×
282
                return fmt.Errorf("could not add source node to SQL store: %w",
×
283
                        err)
×
284
        }
×
285

286
        // Verify that the source node was added correctly by fetching it back
287
        // from the SQL database and checking that the expected DB ID and
288
        // pub key are returned. We don't need to do a whole node comparison
289
        // here, as this was already done in the previous migration step.
290
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
291
        if err != nil {
×
292
                return fmt.Errorf("could not get source nodes from SQL "+
×
UNCOV
293
                        "store: %w", err)
×
UNCOV
294
        }
×
295

296
        // The SQL store has support for multiple source nodes (for future
297
        // protocol versions) but this migration is purely aimed at the V1
298
        // store, and so we expect exactly one source node to be present.
299
        if len(srcNodes) != 1 {
×
300
                return fmt.Errorf("expected exactly one source node, "+
×
301
                        "got %d", len(srcNodes))
×
302
        }
×
303

304
        // Check that the source node ID and pub key match the original
305
        // source node.
306
        if srcNodes[0].NodeID != id {
×
307
                return fmt.Errorf("source node ID mismatch after migration: "+
×
308
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
309
        }
×
310
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
UNCOV
311
        if err != nil {
×
UNCOV
312
                return fmt.Errorf("source node pubkey mismatch after "+
×
UNCOV
313
                        "migration: %w", err)
×
UNCOV
314
        }
×
315

UNCOV
316
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
UNCOV
317

×
318
        return nil
×
319
}
320

321
// migrateChannelsAndPolicies migrates all channels and their policies
322
// from the KV backend to the SQL database.
323
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
324
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
UNCOV
325

×
326
        var (
×
UNCOV
327
                channelCount       uint64
×
UNCOV
328
                skippedChanCount   uint64
×
UNCOV
329
                policyCount        uint64
×
UNCOV
330
                skippedPolicyCount uint64
×
331

×
332
                t0    = time.Now()
×
333
                chunk uint64
×
334
                s     = rate.Sometimes{
×
335
                        Interval: 10 * time.Second,
×
336
                }
×
337
        )
×
338
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
339
                // If the policy is nil, we can skip it.
×
340
                if policy == nil {
×
341
                        return nil
×
342
                }
×
343

344
                // Unlike the special case of invalid TLV bytes for node and
345
                // channel announcements, we don't need to handle the case for
346
                // channel policies here because it is already handled in the
347
                // `forEachChannel` function. If the policy has invalid TLV
348
                // bytes, then `nil` will be passed to this function.
349

UNCOV
350
                policyCount++
×
UNCOV
351

×
UNCOV
352
                _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
×
UNCOV
353
                if err != nil {
×
UNCOV
354
                        return fmt.Errorf("could not migrate channel "+
×
UNCOV
355
                                "policy %d: %w", policy.ChannelID, err)
×
UNCOV
356
                }
×
357

358
                return nil
×
359
        }
360

361
        // Iterate over each channel in the KV store and migrate it and its
362
        // policies to the SQL database.
363
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
364
                policy1 *models.ChannelEdgePolicy,
×
365
                policy2 *models.ChannelEdgePolicy) error {
×
366

×
367
                scid := channel.ChannelID
×
368

×
369
                // Here, we do a sanity check to ensure that the chain hash of
×
370
                // the channel returned by the KV store matches the expected
×
371
                // chain hash. This is important since in the SQL store, we will
×
UNCOV
372
                // no longer explicitly store the chain hash in the channel
×
373
                // info, but rather rely on the chain hash LND is running with.
×
374
                // So this is our way of ensuring that LND is running on the
×
375
                // correct network at migration time.
×
376
                if channel.ChainHash != chain {
×
377
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
378
                                "expected %s", scid, channel.ChainHash, chain)
×
UNCOV
379
                }
×
380

381
                // Sanity check to ensure that the channel has valid extra
382
                // opaque data. If it does not, we'll skip it. We need to do
383
                // this because previously we would just persist any TLV bytes
384
                // that we received without validating them. Now, however, we
385
                // normalise the storage of extra opaque data, so we need to
386
                // ensure that the data is valid. We don't want to abort the
387
                // migration if we encounter a channel with invalid extra opaque
388
                // data, so we'll just skip it and log a warning.
389
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
390
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
391
                        log.Warnf("Skipping channel %d with invalid "+
×
392
                                "extra opaque data: %v", scid,
×
393
                                channel.ExtraOpaqueData)
×
394

×
395
                        skippedChanCount++
×
396

×
397
                        // If we skip a channel, we also skip its policies.
×
UNCOV
398
                        if policy1 != nil {
×
399
                                skippedPolicyCount++
×
400
                        }
×
401
                        if policy2 != nil {
×
402
                                skippedPolicyCount++
×
403
                        }
×
404

UNCOV
405
                        return nil
×
UNCOV
406
                } else if err != nil {
×
UNCOV
407
                        return fmt.Errorf("unable to marshal extra opaque "+
×
UNCOV
408
                                "data for channel %d (%v): %w", scid,
×
UNCOV
409
                                channel.ExtraOpaqueData, err)
×
410
                }
×
411

412
                channelCount++
×
413
                err = migrateSingleChannel(
×
414
                        ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
×
415
                )
×
416
                if err != nil {
×
417
                        return fmt.Errorf("could not migrate channel %d: %w",
×
418
                                scid, err)
×
419
                }
×
420

UNCOV
421
                s.Do(func() {
×
422
                        elapsed := time.Since(t0).Seconds()
×
423
                        ratePerSec := float64(chunk) / elapsed
×
424
                        log.Debugf("Migrated %d channels (%.2f channels/sec)",
×
425
                                channelCount, ratePerSec)
×
426

×
427
                        t0 = time.Now()
×
428
                        chunk = 0
×
429
                })
×
430

431
                return nil
×
UNCOV
432
        }, func() {
×
UNCOV
433
                // No reset is needed since if a retry occurs, the entire
×
434
                // migration will be retried from the start.
×
435
        })
×
436
        if err != nil {
×
437
                return fmt.Errorf("could not migrate channels and policies: %w",
×
438
                        err)
×
439
        }
×
440

441
        log.Infof("Migrated %d channels and %d policies from KV to SQL "+
×
442
                "(skipped %d channels and %d policies due to invalid TLV "+
×
443
                "streams)", channelCount, policyCount, skippedChanCount,
×
UNCOV
444
                skippedPolicyCount)
×
UNCOV
445

×
UNCOV
446
        return nil
×
447
}
448

449
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
450
        channel *models.ChannelEdgeInfo,
451
        policy1, policy2 *models.ChannelEdgePolicy,
452
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
453

×
454
        scid := channel.ChannelID
×
455

×
456
        // First, migrate the channel info along with its policies.
×
457
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
458
        if err != nil {
×
459
                return fmt.Errorf("could not insert record for channel %d "+
×
460
                        "in SQL store: %w", scid, err)
×
461
        }
×
462

463
        // Now, migrate the two channel policies.
464
        err = migChanPolicy(policy1)
×
UNCOV
465
        if err != nil {
×
466
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
467
                        err)
×
468
        }
×
469
        err = migChanPolicy(policy2)
×
470
        if err != nil {
×
471
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
472
                        err)
×
UNCOV
473
        }
×
474

475
        // Now, fetch the channel and its policies from the SQL DB.
476
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
477
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
478
                        Scid:    channelIDToBytes(scid),
×
479
                        Version: int16(ProtocolV1),
×
480
                },
×
481
        )
×
UNCOV
482
        if err != nil {
×
483
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
484
                        err)
×
485
        }
×
486

487
        // Assert that the DB IDs for the channel and nodes are as expected
488
        // given the inserted channel info.
UNCOV
489
        err = sqldb.CompareRecords(
×
490
                dbChanInfo.channelID, row.GraphChannel.ID, "channel DB ID",
×
491
        )
×
492
        if err != nil {
×
493
                return err
×
UNCOV
494
        }
×
495
        err = sqldb.CompareRecords(
×
496
                dbChanInfo.node1ID, row.GraphNode.ID, "node1 DB ID",
×
UNCOV
497
        )
×
UNCOV
498
        if err != nil {
×
499
                return err
×
500
        }
×
501
        err = sqldb.CompareRecords(
×
502
                dbChanInfo.node2ID, row.GraphNode_2.ID, "node2 DB ID",
×
UNCOV
503
        )
×
UNCOV
504
        if err != nil {
×
505
                return err
×
UNCOV
506
        }
×
507

508
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
509
                ctx, sqlDB, row, channel.ChainHash,
×
510
        )
×
511
        if err != nil {
×
512
                return fmt.Errorf("could not build migrated channel and "+
×
513
                        "policies: %w", err)
×
UNCOV
514
        }
×
515

516
        // Finally, compare the original channel info and
517
        // policies with the migrated ones to ensure they match.
UNCOV
518
        if len(channel.ExtraOpaqueData) == 0 {
×
UNCOV
519
                channel.ExtraOpaqueData = nil
×
520
        }
×
521
        if len(migChan.ExtraOpaqueData) == 0 {
×
522
                migChan.ExtraOpaqueData = nil
×
523
        }
×
524

UNCOV
525
        err = sqldb.CompareRecords(
×
526
                channel, migChan, fmt.Sprintf("channel %d", scid),
×
527
        )
×
528
        if err != nil {
×
529
                return err
×
530
        }
×
531

532
        checkPolicy := func(expPolicy,
×
UNCOV
533
                migPolicy *models.ChannelEdgePolicy) error {
×
UNCOV
534

×
UNCOV
535
                switch {
×
536
                // Both policies are nil, nothing to compare.
UNCOV
537
                case expPolicy == nil && migPolicy == nil:
×
UNCOV
538
                        return nil
×
539

540
                // One of the policies is nil, but the other is not.
541
                case expPolicy == nil || migPolicy == nil:
×
542
                        return fmt.Errorf("expected both policies to be "+
×
543
                                "non-nil. Got expPolicy: %v, "+
×
544
                                "migPolicy: %v", expPolicy, migPolicy)
×
545

546
                // Both policies are non-nil, we can compare them.
547
                default:
×
548
                }
549

550
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
551
                        expPolicy.ExtraOpaqueData = nil
×
552
                }
×
553
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
554
                        migPolicy.ExtraOpaqueData = nil
×
555
                }
×
556

557
                return sqldb.CompareRecords(
×
558
                        *expPolicy, *migPolicy, "channel policy",
×
559
                )
×
560
        }
561

UNCOV
562
        err = checkPolicy(policy1, migPol1)
×
563
        if err != nil {
×
564
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
565
                        err)
×
566
        }
×
567

568
        err = checkPolicy(policy2, migPol2)
×
569
        if err != nil {
×
570
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
571
                        err)
×
572
        }
×
573

UNCOV
574
        return nil
×
575
}
576

577
// migratePruneLog migrates the prune log from the KV backend to the SQL
578
// database. It iterates over each prune log entry in the KV store, inserts it
579
// into the SQL database, and then verifies that the entry was inserted
580
// correctly by fetching it back from the SQL database and comparing it to the
581
// original entry.
582
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
583
        sqlDB SQLQueries) error {
×
584

×
585
        var (
×
UNCOV
586
                count          uint64
×
UNCOV
587
                pruneTipHeight uint32
×
UNCOV
588
                pruneTipHash   chainhash.Hash
×
UNCOV
589

×
590
                t0    = time.Now()
×
591
                chunk uint64
×
592
                s     = rate.Sometimes{
×
593
                        Interval: 10 * time.Second,
×
594
                }
×
595
        )
×
596

×
597
        // migrateSinglePruneEntry is a helper function that inserts a single
×
UNCOV
598
        // prune log entry into the SQL database and verifies that it was
×
599
        // inserted correctly.
×
UNCOV
600
        migrateSinglePruneEntry := func(height uint32,
×
UNCOV
601
                hash *chainhash.Hash) error {
×
602

×
603
                count++
×
604

×
UNCOV
605
                // Keep track of the prune tip height and hash.
×
UNCOV
606
                if height > pruneTipHeight {
×
UNCOV
607
                        pruneTipHeight = height
×
608
                        pruneTipHash = *hash
×
609
                }
×
610

611
                err := sqlDB.UpsertPruneLogEntry(
×
612
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
613
                                BlockHeight: int64(height),
×
614
                                BlockHash:   hash[:],
×
615
                        },
×
616
                )
×
617
                if err != nil {
×
UNCOV
618
                        return fmt.Errorf("unable to insert prune log "+
×
UNCOV
619
                                "entry for height %d: %w", height, err)
×
620
                }
×
621

622
                // Now, check that the entry was inserted correctly.
UNCOV
623
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
624
                        ctx, int64(height),
×
625
                )
×
626
                if err != nil {
×
627
                        return fmt.Errorf("could not get prune hash "+
×
628
                                "for height %d: %w", height, err)
×
629
                }
×
630

631
                return sqldb.CompareRecords(
×
632
                        hash[:], migratedHash, "prune log entry",
×
UNCOV
633
                )
×
634
        }
635

636
        // Iterate over each prune log entry in the KV store and migrate it to
637
        // the SQL database.
638
        err := forEachPruneLogEntry(
×
UNCOV
639
                kvBackend, func(height uint32, hash *chainhash.Hash) error {
×
UNCOV
640
                        err := migrateSinglePruneEntry(height, hash)
×
UNCOV
641
                        if err != nil {
×
UNCOV
642
                                return fmt.Errorf("could not migrate "+
×
UNCOV
643
                                        "prune log entry at height %d: %w",
×
UNCOV
644
                                        height, err)
×
UNCOV
645
                        }
×
646

647
                        s.Do(func() {
×
648
                                elapsed := time.Since(t0).Seconds()
×
649
                                ratePerSec := float64(chunk) / elapsed
×
650
                                log.Debugf("Migrated %d prune log "+
×
651
                                        "entries (%.2f entries/sec)",
×
652
                                        count, ratePerSec)
×
653

×
654
                                t0 = time.Now()
×
UNCOV
655
                                chunk = 0
×
656
                        })
×
657

658
                        return nil
×
659
                },
660
        )
661
        if err != nil {
×
662
                return fmt.Errorf("could not migrate prune log: %w", err)
×
663
        }
×
664

665
        // Check that the prune tip is set correctly in the SQL
666
        // database.
667
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
668
        if errors.Is(err, sql.ErrNoRows) {
×
669
                // The ErrGraphNeverPruned error is expected if no prune log
×
UNCOV
670
                // entries were migrated from the kvdb store. Otherwise, it's
×
671
                // an unexpected error.
×
672
                if count == 0 {
×
673
                        log.Infof("No prune log entries found in KV store " +
×
674
                                "to migrate")
×
675
                        return nil
×
676
                }
×
677
                // Fall-through to the next error check.
678
        }
679
        if err != nil {
×
UNCOV
680
                return fmt.Errorf("could not get prune tip: %w", err)
×
UNCOV
681
        }
×
682

UNCOV
683
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
UNCOV
684
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
685

×
686
                return fmt.Errorf("prune tip mismatch after migration: "+
×
687
                        "expected height %d, hash %s; got height %d, "+
×
688
                        "hash %s", pruneTipHeight, pruneTipHash,
×
689
                        pruneTip.BlockHeight,
×
690
                        chainhash.Hash(pruneTip.BlockHash))
×
691
        }
×
692

693
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
694
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
695
                pruneTipHash)
×
696

×
697
        return nil
×
698
}
699

700
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
701
// and policies from the given row returned by the SQL query
702
// GetChannelBySCIDWithPolicies.
703
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
704
        row sqlc.GetChannelBySCIDWithPoliciesRow,
705
        chain chainhash.Hash) (*models.ChannelEdgeInfo,
706
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
707

×
UNCOV
708
        node1, node2, err := buildNodeVertices(
×
UNCOV
709
                row.GraphNode.PubKey, row.GraphNode_2.PubKey,
×
UNCOV
710
        )
×
UNCOV
711
        if err != nil {
×
UNCOV
712
                return nil, nil, nil, err
×
UNCOV
713
        }
×
714

UNCOV
715
        edge, err := getAndBuildEdgeInfo(
×
716
                ctx, db, chain, row.GraphChannel.ID, row.GraphChannel, node1,
×
717
                node2,
×
718
        )
×
719
        if err != nil {
×
720
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
721
                        "info: %w", err)
×
722
        }
×
723

724
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
725
        if err != nil {
×
726
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
727
                        "policies: %w", err)
×
UNCOV
728
        }
×
729

UNCOV
730
        policy1, policy2, err := getAndBuildChanPolicies(
×
731
                ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
×
732
        )
×
733
        if err != nil {
×
734
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
735
                        "policies: %w", err)
×
UNCOV
736
        }
×
737

738
        return edge, policy1, policy2, nil
×
739
}
740

741
// forEachPruneLogEntry iterates over each prune log entry in the KV
742
// backend and calls the provided callback function for each entry.
743
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
UNCOV
744
        hash *chainhash.Hash) error) error {
×
745

×
746
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
747
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
748
                if metaBucket == nil {
×
749
                        return ErrGraphNotFound
×
UNCOV
750
                }
×
751

752
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
753
                if pruneBucket == nil {
×
UNCOV
754
                        // The graph has never been pruned and so, there are no
×
UNCOV
755
                        // entries to iterate over.
×
UNCOV
756
                        return nil
×
UNCOV
757
                }
×
758

UNCOV
759
                return pruneBucket.ForEach(func(k, v []byte) error {
×
UNCOV
760
                        blockHeight := byteOrder.Uint32(k)
×
UNCOV
761
                        var blockHash chainhash.Hash
×
UNCOV
762
                        copy(blockHash[:], v)
×
UNCOV
763

×
UNCOV
764
                        return cb(blockHeight, &blockHash)
×
UNCOV
765
                })
×
UNCOV
766
        }, func() {})
×
767
}
768

769
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
770
// the SQL database. It iterates over each closed SCID in the KV store, inserts
771
// it into the SQL database, and then verifies that the SCID was inserted
772
// correctly by checking if the channel with the given SCID is seen as closed in
773
// the SQL database.
774
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
775
        sqlDB SQLQueries) error {
×
776

×
777
        var (
×
778
                count uint64
×
779

×
780
                t0    = time.Now()
×
781
                chunk uint64
×
782
                s     = rate.Sometimes{
×
783
                        Interval: 10 * time.Second,
×
784
                }
×
785
        )
×
786
        migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
×
787
                count++
×
788

×
UNCOV
789
                chanIDB := channelIDToBytes(scid.ToUint64())
×
790
                err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
791
                if err != nil {
×
792
                        return fmt.Errorf("could not insert closed channel "+
×
793
                                "with SCID %s: %w", scid, err)
×
794
                }
×
795

796
                // Now, verify that the channel with the given SCID is
797
                // seen as closed.
798
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
799
                if err != nil {
×
800
                        return fmt.Errorf("could not check if channel %s "+
×
801
                                "is closed: %w", scid, err)
×
802
                }
×
803

UNCOV
804
                if !isClosed {
×
UNCOV
805
                        return fmt.Errorf("channel %s should be closed, "+
×
UNCOV
806
                                "but is not", scid)
×
807
                }
×
808

809
                s.Do(func() {
×
810
                        elapsed := time.Since(t0).Seconds()
×
811
                        ratePerSec := float64(chunk) / elapsed
×
812
                        log.Debugf("Migrated %d closed scids "+
×
813
                                "(%.2f entries/sec)", count, ratePerSec)
×
814

×
815
                        t0 = time.Now()
×
816
                        chunk = 0
×
UNCOV
817
                })
×
818

819
                return nil
×
820
        }
821

UNCOV
822
        err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID)
×
823
        if err != nil {
×
UNCOV
824
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
825
                        err)
×
826
        }
×
827

UNCOV
828
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
829

×
830
        return nil
×
831
}
832

833
// migrateZombieIndex migrates the zombie index from the KV backend to
834
// the SQL database. It iterates over each zombie channel in the KV store,
835
// inserts it into the SQL database, and then verifies that the channel is
836
// indeed marked as a zombie channel in the SQL database.
837
//
838
// NOTE: before inserting an entry into the zombie index, the function checks
839
// if the channel is already marked as closed in the SQL store. If it is,
840
// the entry is skipped. This means that the resulting zombie index count in
841
// the SQL store may well be less than the count of zombie channels in the KV
842
// store.
843
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
844
        sqlDB SQLQueries) error {
×
845

×
846
        var (
×
847
                count uint64
×
UNCOV
848

×
849
                t0    = time.Now()
×
850
                chunk uint64
×
851
                s     = rate.Sometimes{
×
852
                        Interval: 10 * time.Second,
×
853
                }
×
854
        )
×
855
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
856
                pubKey2 [33]byte) error {
×
UNCOV
857

×
UNCOV
858
                chanIDB := channelIDToBytes(chanID)
×
UNCOV
859

×
UNCOV
860
                // If it is in the closed SCID index, we don't need to
×
UNCOV
861
                // add it to the zombie index.
×
862
                //
×
863
                // NOTE: this means that the resulting zombie index count in
×
864
                // the SQL store may well be less than the count of zombie
×
865
                // channels in the KV store.
×
866
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
867
                if err != nil {
×
868
                        return fmt.Errorf("could not check closed "+
×
UNCOV
869
                                "channel: %w", err)
×
870
                }
×
871
                if isClosed {
×
872
                        return nil
×
873
                }
×
874

875
                count++
×
UNCOV
876

×
UNCOV
877
                err = sqlDB.UpsertZombieChannel(
×
UNCOV
878
                        ctx, sqlc.UpsertZombieChannelParams{
×
UNCOV
879
                                Version:  int16(ProtocolV1),
×
UNCOV
880
                                Scid:     chanIDB,
×
UNCOV
881
                                NodeKey1: pubKey1[:],
×
UNCOV
882
                                NodeKey2: pubKey2[:],
×
UNCOV
883
                        },
×
UNCOV
884
                )
×
UNCOV
885
                if err != nil {
×
UNCOV
886
                        return fmt.Errorf("could not upsert zombie "+
×
UNCOV
887
                                "channel %d: %w", chanID, err)
×
UNCOV
888
                }
×
889

890
                // Finally, verify that the channel is indeed marked as a
891
                // zombie channel.
UNCOV
892
                isZombie, err := sqlDB.IsZombieChannel(
×
UNCOV
893
                        ctx, sqlc.IsZombieChannelParams{
×
UNCOV
894
                                Version: int16(ProtocolV1),
×
UNCOV
895
                                Scid:    chanIDB,
×
UNCOV
896
                        },
×
UNCOV
897
                )
×
UNCOV
898
                if err != nil {
×
UNCOV
899
                        return fmt.Errorf("could not check if "+
×
UNCOV
900
                                "channel %d is zombie: %w", chanID, err)
×
UNCOV
901
                }
×
902

UNCOV
903
                if !isZombie {
×
UNCOV
904
                        return fmt.Errorf("channel %d should be "+
×
UNCOV
905
                                "a zombie, but is not", chanID)
×
UNCOV
906
                }
×
907

UNCOV
908
                s.Do(func() {
×
UNCOV
909
                        elapsed := time.Since(t0).Seconds()
×
UNCOV
910
                        ratePerSec := float64(chunk) / elapsed
×
UNCOV
911
                        log.Debugf("Migrated %d zombie index entries "+
×
UNCOV
912
                                "(%.2f entries/sec)", count, ratePerSec)
×
UNCOV
913

×
UNCOV
914
                        t0 = time.Now()
×
UNCOV
915
                        chunk = 0
×
UNCOV
916
                })
×
917

UNCOV
918
                return nil
×
919
        })
UNCOV
920
        if err != nil {
×
UNCOV
921
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
UNCOV
922
        }
×
923

UNCOV
924
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
UNCOV
925

×
UNCOV
926
        return nil
×
927
}
928

929
// forEachZombieEntry iterates over each zombie channel entry in the
930
// KV backend and calls the provided callback function for each entry.
931
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
UNCOV
932
        pubKey2 [33]byte) error) error {
×
UNCOV
933

×
UNCOV
934
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
UNCOV
935
                edges := tx.ReadBucket(edgeBucket)
×
UNCOV
936
                if edges == nil {
×
UNCOV
937
                        return ErrGraphNoEdgesFound
×
UNCOV
938
                }
×
UNCOV
939
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
UNCOV
940
                if zombieIndex == nil {
×
UNCOV
941
                        return nil
×
UNCOV
942
                }
×
943

UNCOV
944
                return zombieIndex.ForEach(func(k, v []byte) error {
×
UNCOV
945
                        var pubKey1, pubKey2 [33]byte
×
UNCOV
946
                        copy(pubKey1[:], v[:33])
×
UNCOV
947
                        copy(pubKey2[:], v[33:])
×
UNCOV
948

×
UNCOV
949
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
UNCOV
950
                })
×
UNCOV
951
        }, func() {})
×
952
}
953

954
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
955
// the provided callback function for each SCID.
956
func forEachClosedSCID(db kvdb.Backend,
UNCOV
957
        cb func(lnwire.ShortChannelID) error) error {
×
UNCOV
958

×
UNCOV
959
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
UNCOV
960
                closedScids := tx.ReadBucket(closedScidBucket)
×
UNCOV
961
                if closedScids == nil {
×
UNCOV
962
                        return nil
×
UNCOV
963
                }
×
964

UNCOV
965
                return closedScids.ForEach(func(k, _ []byte) error {
×
UNCOV
966
                        return cb(lnwire.NewShortChanIDFromInt(
×
UNCOV
967
                                byteOrder.Uint64(k),
×
UNCOV
968
                        ))
×
UNCOV
969
                })
×
UNCOV
970
        }, func() {})
×
971
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc