• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16215965260

11 Jul 2025 08:52AM UTC coverage: 67.325% (+9.7%) from 57.653%
16215965260

push

github

web-flow
Merge pull request #10038 from ellemouton/graphMig3-indexes

[graph mig 3]: graph/db: migrate zombies, closed SCIDs, prune log from kvdb to SQL

0 of 237 new or added lines in 2 files covered. (0.0%)

30 existing lines in 9 files now uncovered.

135271 of 200923 relevant lines covered (67.32%)

21761.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "cmp"
6
        "context"
7
        "database/sql"
8
        "errors"
9
        "fmt"
10
        "net"
11
        "slices"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb"
19
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
20
)
21

22
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
23
// backend.
24
//
25
// NOTE: this is currently not called from any code path. It is called via tests
26
// only for now and will be called from the main lnd binary once the
27
// migration is fully implemented and tested.
28
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
29
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
30

×
31
        log.Infof("Starting migration of the graph store from KV to SQL")
×
32
        t0 := time.Now()
×
33

×
34
        // Check if there is a graph to migrate.
×
35
        graphExists, err := checkGraphExists(kvBackend)
×
36
        if err != nil {
×
37
                return fmt.Errorf("failed to check graph existence: %w", err)
×
38
        }
×
39
        if !graphExists {
×
40
                log.Infof("No graph found in KV store, skipping the migration")
×
41
                return nil
×
42
        }
×
43

44
        // 1) Migrate all the nodes.
45
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
46
                return fmt.Errorf("could not migrate nodes: %w", err)
×
47
        }
×
48

49
        // 2) Migrate the source node.
50
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
51
                return fmt.Errorf("could not migrate source node: %w", err)
×
52
        }
×
53

54
        // 3) Migrate all the channels and channel policies.
55
        err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
×
56
        if err != nil {
×
57
                return fmt.Errorf("could not migrate channels and policies: %w",
×
58
                        err)
×
59
        }
×
60

61
        // 4) Migrate the Prune log.
NEW
62
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
NEW
63
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
64
        }
×
65

66
        // 5) Migrate the closed SCID index.
NEW
67
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
NEW
68
        if err != nil {
×
NEW
69
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
70
                        err)
×
NEW
71
        }
×
72

73
        // 6) Migrate the zombie index.
NEW
74
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
NEW
75
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
76
        }
×
77

78
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
79
                time.Since(t0))
×
80

×
81
        return nil
×
82
}
83

84
// checkGraphExists checks if the graph exists in the KV backend.
85
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
86
        // Check if there is even a graph to migrate.
×
87
        err := db.View(func(tx kvdb.RTx) error {
×
88
                // Check for the existence of the node bucket which is a top
×
89
                // level bucket that would have been created on the initial
×
90
                // creation of the graph store.
×
91
                nodes := tx.ReadBucket(nodeBucket)
×
92
                if nodes == nil {
×
93
                        return ErrGraphNotFound
×
94
                }
×
95

96
                return nil
×
97
        }, func() {})
×
98
        if errors.Is(err, ErrGraphNotFound) {
×
99
                return false, nil
×
100
        } else if err != nil {
×
101
                return false, err
×
102
        }
×
103

104
        return true, nil
×
105
}
106

107
// migrateNodes migrates all nodes from the KV backend to the SQL database.
108
// This includes doing a sanity check after each migration to ensure that the
109
// migrated node matches the original node.
110
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
111
        sqlDB SQLQueries) error {
×
112

×
113
        // Keep track of the number of nodes migrated and the number of
×
114
        // nodes skipped due to errors.
×
115
        var (
×
116
                count   uint64
×
117
                skipped uint64
×
118
        )
×
119

×
120
        // Loop through each node in the KV store and insert it into the SQL
×
121
        // database.
×
122
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
123
                node *models.LightningNode) error {
×
124

×
125
                pub := node.PubKeyBytes
×
126

×
127
                // Sanity check to ensure that the node has valid extra opaque
×
128
                // data. If it does not, we'll skip it. We need to do this
×
129
                // because previously we would just persist any TLV bytes that
×
130
                // we received without validating them. Now, however, we
×
131
                // normalise the storage of extra opaque data, so we need to
×
132
                // ensure that the data is valid. We don't want to abort the
×
133
                // migration if we encounter a node with invalid extra opaque
×
134
                // data, so we'll just skip it and log a warning.
×
135
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
136
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
137
                        skipped++
×
138
                        log.Warnf("Skipping migration of node %x with invalid "+
×
139
                                "extra opaque data: %v", pub,
×
140
                                node.ExtraOpaqueData)
×
141

×
142
                        return nil
×
143
                } else if err != nil {
×
144
                        return fmt.Errorf("unable to marshal extra "+
×
145
                                "opaque data for node %x: %w", pub, err)
×
146
                }
×
147

148
                count++
×
149

×
150
                // TODO(elle): At this point, we should check the loaded node
×
151
                // to see if we should extract any DNS addresses from its
×
152
                // opaque type addresses. This is expected to be done in:
×
153
                // https://github.com/lightningnetwork/lnd/pull/9455.
×
154
                // This TODO is being tracked in
×
155
                //  https://github.com/lightningnetwork/lnd/issues/9795 as this
×
156
                // must be addressed before making this code path active in
×
157
                // production.
×
158

×
159
                // Write the node to the SQL database.
×
160
                id, err := upsertNode(ctx, sqlDB, node)
×
161
                if err != nil {
×
162
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
163
                                err)
×
164
                }
×
165

166
                // Fetch it from the SQL store and compare it against the
167
                // original node object to ensure the migration was successful.
168
                dbNode, err := sqlDB.GetNodeByPubKey(
×
169
                        ctx, sqlc.GetNodeByPubKeyParams{
×
170
                                PubKey:  node.PubKeyBytes[:],
×
171
                                Version: int16(ProtocolV1),
×
172
                        },
×
173
                )
×
174
                if err != nil {
×
175
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
176
                                "after migration: %w", pub, err)
×
177
                }
×
178

179
                // Sanity check: ensure the migrated node ID matches the one we
180
                // just inserted.
181
                if dbNode.ID != id {
×
182
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
183
                                "after migration: expected %d, got %d",
×
184
                                pub, id, dbNode.ID)
×
185
                }
×
186

187
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
188
                if err != nil {
×
189
                        return fmt.Errorf("could not build migrated node "+
×
190
                                "from dbNode(db id: %d, node pub: %x): %w",
×
191
                                dbNode.ID, pub, err)
×
192
                }
×
193

194
                // Make sure that the node addresses are sorted before
195
                // comparing them to ensure that the order of addresses does
196
                // not affect the comparison.
197
                slices.SortFunc(node.Addresses, func(i, j net.Addr) int {
×
198
                        return cmp.Compare(i.String(), j.String())
×
199
                })
×
200
                slices.SortFunc(
×
201
                        migratedNode.Addresses, func(i, j net.Addr) int {
×
202
                                return cmp.Compare(i.String(), j.String())
×
203
                        },
×
204
                )
205

206
                return sqldb.CompareRecords(
×
207
                        node, migratedNode, fmt.Sprintf("node %x", pub),
×
208
                )
×
209
        })
210
        if err != nil {
×
211
                return fmt.Errorf("could not migrate nodes: %w", err)
×
212
        }
×
213

214
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
215
                "invalid TLV streams)", count, skipped)
×
216

×
217
        return nil
×
218
}
219

220
// migrateSourceNode migrates the source node from the KV backend to the
221
// SQL database.
222
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
223
        sqlDB SQLQueries) error {
×
224

×
225
        sourceNode, err := sourceNode(kvdb)
×
226
        if errors.Is(err, ErrSourceNodeNotSet) {
×
227
                // If the source node has not been set yet, we can skip this
×
228
                // migration step.
×
229
                return nil
×
230
        } else if err != nil {
×
231
                return fmt.Errorf("could not get source node from kv "+
×
232
                        "store: %w", err)
×
233
        }
×
234

235
        pub := sourceNode.PubKeyBytes
×
236

×
237
        // Get the DB ID of the source node by its public key. This node must
×
238
        // already exist in the SQL database, as it should have been migrated
×
239
        // in the previous node-migration step.
×
240
        id, err := sqlDB.GetNodeIDByPubKey(
×
241
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
242
                        PubKey:  pub[:],
×
243
                        Version: int16(ProtocolV1),
×
244
                },
×
245
        )
×
246
        if err != nil {
×
247
                return fmt.Errorf("could not get source node ID: %w", err)
×
248
        }
×
249

250
        // Now we can add the source node to the SQL database.
251
        err = sqlDB.AddSourceNode(ctx, id)
×
252
        if err != nil {
×
253
                return fmt.Errorf("could not add source node to SQL store: %w",
×
254
                        err)
×
255
        }
×
256

257
        // Verify that the source node was added correctly by fetching it back
258
        // from the SQL database and checking that the expected DB ID and
259
        // pub key are returned. We don't need to do a whole node comparison
260
        // here, as this was already done in the previous migration step.
261
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
262
        if err != nil {
×
263
                return fmt.Errorf("could not get source nodes from SQL "+
×
264
                        "store: %w", err)
×
265
        }
×
266

267
        // The SQL store has support for multiple source nodes (for future
268
        // protocol versions) but this migration is purely aimed at the V1
269
        // store, and so we expect exactly one source node to be present.
270
        if len(srcNodes) != 1 {
×
271
                return fmt.Errorf("expected exactly one source node, "+
×
272
                        "got %d", len(srcNodes))
×
273
        }
×
274

275
        // Check that the source node ID and pub key match the original
276
        // source node.
277
        if srcNodes[0].NodeID != id {
×
278
                return fmt.Errorf("source node ID mismatch after migration: "+
×
279
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
280
        }
×
281
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
282
        if err != nil {
×
283
                return fmt.Errorf("source node pubkey mismatch after "+
×
284
                        "migration: %w", err)
×
285
        }
×
286

287
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
288

×
289
        return nil
×
290
}
291

292
// migrateChannelsAndPolicies migrates all channels and their policies
293
// from the KV backend to the SQL database.
294
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
295
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
296

×
297
        var (
×
298
                channelCount       uint64
×
299
                skippedChanCount   uint64
×
300
                policyCount        uint64
×
301
                skippedPolicyCount uint64
×
302
        )
×
303
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
304
                // If the policy is nil, we can skip it.
×
305
                if policy == nil {
×
306
                        return nil
×
307
                }
×
308

309
                // Unlike the special case of invalid TLV bytes for node and
310
                // channel announcements, we don't need to handle the case for
311
                // channel policies here because it is already handled in the
312
                // `forEachChannel` function. If the policy has invalid TLV
313
                // bytes, then `nil` will be passed to this function.
314

315
                policyCount++
×
316

×
317
                _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
×
318
                if err != nil {
×
319
                        return fmt.Errorf("could not migrate channel "+
×
320
                                "policy %d: %w", policy.ChannelID, err)
×
321
                }
×
322

323
                return nil
×
324
        }
325

326
        // Iterate over each channel in the KV store and migrate it and its
327
        // policies to the SQL database.
328
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
329
                policy1 *models.ChannelEdgePolicy,
×
330
                policy2 *models.ChannelEdgePolicy) error {
×
331

×
332
                scid := channel.ChannelID
×
333

×
334
                // Here, we do a sanity check to ensure that the chain hash of
×
335
                // the channel returned by the KV store matches the expected
×
336
                // chain hash. This is important since in the SQL store, we will
×
337
                // no longer explicitly store the chain hash in the channel
×
338
                // info, but rather rely on the chain hash LND is running with.
×
339
                // So this is our way of ensuring that LND is running on the
×
340
                // correct network at migration time.
×
341
                if channel.ChainHash != chain {
×
342
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
343
                                "expected %s", scid, channel.ChainHash, chain)
×
344
                }
×
345

346
                // Sanity check to ensure that the channel has valid extra
347
                // opaque data. If it does not, we'll skip it. We need to do
348
                // this because previously we would just persist any TLV bytes
349
                // that we received without validating them. Now, however, we
350
                // normalise the storage of extra opaque data, so we need to
351
                // ensure that the data is valid. We don't want to abort the
352
                // migration if we encounter a channel with invalid extra opaque
353
                // data, so we'll just skip it and log a warning.
354
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
355
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
356
                        log.Warnf("Skipping channel %d with invalid "+
×
357
                                "extra opaque data: %v", scid,
×
358
                                channel.ExtraOpaqueData)
×
359

×
360
                        skippedChanCount++
×
361

×
362
                        // If we skip a channel, we also skip its policies.
×
363
                        if policy1 != nil {
×
364
                                skippedPolicyCount++
×
365
                        }
×
366
                        if policy2 != nil {
×
367
                                skippedPolicyCount++
×
368
                        }
×
369

370
                        return nil
×
371
                } else if err != nil {
×
372
                        return fmt.Errorf("unable to marshal extra opaque "+
×
373
                                "data for channel %d (%v): %w", scid,
×
374
                                channel.ExtraOpaqueData, err)
×
375
                }
×
376

377
                channelCount++
×
378
                err = migrateSingleChannel(
×
379
                        ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
×
380
                )
×
381
                if err != nil {
×
382
                        return fmt.Errorf("could not migrate channel %d: %w",
×
383
                                scid, err)
×
384
                }
×
385

386
                return nil
×
387
        })
388
        if err != nil {
×
389
                return fmt.Errorf("could not migrate channels and policies: %w",
×
390
                        err)
×
391
        }
×
392

393
        log.Infof("Migrated %d channels and %d policies from KV to SQL "+
×
394
                "(skipped %d channels and %d policies due to invalid TLV "+
×
395
                "streams)", channelCount, policyCount, skippedChanCount,
×
396
                skippedPolicyCount)
×
397

×
398
        return nil
×
399
}
400

401
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
402
        channel *models.ChannelEdgeInfo,
403
        policy1, policy2 *models.ChannelEdgePolicy,
404
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
405

×
406
        scid := channel.ChannelID
×
407

×
408
        // First, migrate the channel info along with its policies.
×
409
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
410
        if err != nil {
×
411
                return fmt.Errorf("could not insert record for channel %d "+
×
412
                        "in SQL store: %w", scid, err)
×
413
        }
×
414

415
        // Now, migrate the two channel policies.
416
        err = migChanPolicy(policy1)
×
417
        if err != nil {
×
418
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
419
                        err)
×
420
        }
×
421
        err = migChanPolicy(policy2)
×
422
        if err != nil {
×
423
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
424
                        err)
×
425
        }
×
426

427
        // Now, fetch the channel and its policies from the SQL DB.
428
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
429
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
430
                        Scid:    channelIDToBytes(scid),
×
431
                        Version: int16(ProtocolV1),
×
432
                },
×
433
        )
×
434
        if err != nil {
×
435
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
436
                        err)
×
437
        }
×
438

439
        // Assert that the DB IDs for the channel and nodes are as expected
440
        // given the inserted channel info.
441
        err = sqldb.CompareRecords(
×
442
                dbChanInfo.channelID, row.Channel.ID, "channel DB ID",
×
443
        )
×
444
        if err != nil {
×
445
                return err
×
446
        }
×
447
        err = sqldb.CompareRecords(
×
448
                dbChanInfo.node1ID, row.Node.ID, "node1 DB ID",
×
449
        )
×
450
        if err != nil {
×
451
                return err
×
452
        }
×
453
        err = sqldb.CompareRecords(
×
454
                dbChanInfo.node2ID, row.Node_2.ID, "node2 DB ID",
×
455
        )
×
456
        if err != nil {
×
457
                return err
×
458
        }
×
459

460
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
461
                ctx, sqlDB, row, channel.ChainHash,
×
462
        )
×
463
        if err != nil {
×
464
                return fmt.Errorf("could not build migrated channel and "+
×
465
                        "policies: %w", err)
×
466
        }
×
467

468
        // Finally, compare the original channel info and
469
        // policies with the migrated ones to ensure they match.
470
        if len(channel.ExtraOpaqueData) == 0 {
×
471
                channel.ExtraOpaqueData = nil
×
472
        }
×
473
        if len(migChan.ExtraOpaqueData) == 0 {
×
474
                migChan.ExtraOpaqueData = nil
×
475
        }
×
476

477
        err = sqldb.CompareRecords(
×
478
                channel, migChan, fmt.Sprintf("channel %d", scid),
×
479
        )
×
480
        if err != nil {
×
481
                return err
×
482
        }
×
483

484
        checkPolicy := func(expPolicy,
×
485
                migPolicy *models.ChannelEdgePolicy) error {
×
486

×
487
                switch {
×
488
                // Both policies are nil, nothing to compare.
489
                case expPolicy == nil && migPolicy == nil:
×
490
                        return nil
×
491

492
                // One of the policies is nil, but the other is not.
493
                case expPolicy == nil || migPolicy == nil:
×
494
                        return fmt.Errorf("expected both policies to be "+
×
495
                                "non-nil. Got expPolicy: %v, "+
×
496
                                "migPolicy: %v", expPolicy, migPolicy)
×
497

498
                // Both policies are non-nil, we can compare them.
499
                default:
×
500
                }
501

502
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
503
                        expPolicy.ExtraOpaqueData = nil
×
504
                }
×
505
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
506
                        migPolicy.ExtraOpaqueData = nil
×
507
                }
×
508

509
                return sqldb.CompareRecords(
×
510
                        *expPolicy, *migPolicy, "channel policy",
×
511
                )
×
512
        }
513

514
        err = checkPolicy(policy1, migPol1)
×
515
        if err != nil {
×
516
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
517
                        err)
×
518
        }
×
519

520
        err = checkPolicy(policy2, migPol2)
×
521
        if err != nil {
×
522
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
523
                        err)
×
524
        }
×
525

526
        return nil
×
527
}
528

529
// migratePruneLog migrates the prune log from the KV backend to the SQL
530
// database. It iterates over each prune log entry in the KV store, inserts it
531
// into the SQL database, and then verifies that the entry was inserted
532
// correctly by fetching it back from the SQL database and comparing it to the
533
// original entry.
534
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
NEW
535
        sqlDB SQLQueries) error {
×
NEW
536

×
NEW
537
        var (
×
NEW
538
                count          uint64
×
NEW
539
                pruneTipHeight uint32
×
NEW
540
                pruneTipHash   chainhash.Hash
×
NEW
541
        )
×
NEW
542

×
NEW
543
        // migrateSinglePruneEntry is a helper function that inserts a single
×
NEW
544
        // prune log entry into the SQL database and verifies that it was
×
NEW
545
        // inserted correctly.
×
NEW
546
        migrateSinglePruneEntry := func(height uint32,
×
NEW
547
                hash *chainhash.Hash) error {
×
NEW
548

×
NEW
549
                count++
×
NEW
550

×
NEW
551
                // Keep track of the prune tip height and hash.
×
NEW
552
                if height > pruneTipHeight {
×
NEW
553
                        pruneTipHeight = height
×
NEW
554
                        pruneTipHash = *hash
×
NEW
555
                }
×
556

NEW
557
                err := sqlDB.UpsertPruneLogEntry(
×
NEW
558
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
NEW
559
                                BlockHeight: int64(height),
×
NEW
560
                                BlockHash:   hash[:],
×
NEW
561
                        },
×
NEW
562
                )
×
NEW
563
                if err != nil {
×
NEW
564
                        return fmt.Errorf("unable to insert prune log "+
×
NEW
565
                                "entry for height %d: %w", height, err)
×
NEW
566
                }
×
567

568
                // Now, check that the entry was inserted correctly.
NEW
569
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
NEW
570
                        ctx, int64(height),
×
NEW
571
                )
×
NEW
572
                if err != nil {
×
NEW
573
                        return fmt.Errorf("could not get prune hash "+
×
NEW
574
                                "for height %d: %w", height, err)
×
NEW
575
                }
×
576

NEW
577
                return sqldb.CompareRecords(
×
NEW
578
                        hash[:], migratedHash, "prune log entry",
×
NEW
579
                )
×
580
        }
581

582
        // Iterate over each prune log entry in the KV store and migrate it to
583
        // the SQL database.
NEW
584
        err := forEachPruneLogEntry(
×
NEW
585
                kvBackend, func(height uint32, hash *chainhash.Hash) error {
×
NEW
586
                        err := migrateSinglePruneEntry(height, hash)
×
NEW
587
                        if err != nil {
×
NEW
588
                                return fmt.Errorf("could not migrate "+
×
NEW
589
                                        "prune log entry at height %d: %w",
×
NEW
590
                                        height, err)
×
NEW
591
                        }
×
592

NEW
593
                        return nil
×
594
                },
595
        )
NEW
596
        if err != nil {
×
NEW
597
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
598
        }
×
599

600
        // Check that the prune tip is set correctly in the SQL
601
        // database.
NEW
602
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
NEW
603
        if errors.Is(err, sql.ErrNoRows) {
×
NEW
604
                // The ErrGraphNeverPruned error is expected if no prune log
×
NEW
605
                // entries were migrated from the kvdb store. Otherwise, it's
×
NEW
606
                // an unexpected error.
×
NEW
607
                if count == 0 {
×
NEW
608
                        log.Infof("No prune log entries found in KV store " +
×
NEW
609
                                "to migrate")
×
NEW
610
                        return nil
×
NEW
611
                }
×
612
                // Fall-through to the next error check.
613
        }
NEW
614
        if err != nil {
×
NEW
615
                return fmt.Errorf("could not get prune tip: %w", err)
×
NEW
616
        }
×
617

NEW
618
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
NEW
619
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
NEW
620

×
NEW
621
                return fmt.Errorf("prune tip mismatch after migration: "+
×
NEW
622
                        "expected height %d, hash %s; got height %d, "+
×
NEW
623
                        "hash %s", pruneTipHeight, pruneTipHash,
×
NEW
624
                        pruneTip.BlockHeight,
×
NEW
625
                        chainhash.Hash(pruneTip.BlockHash))
×
NEW
626
        }
×
627

NEW
628
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
NEW
629
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
NEW
630
                pruneTipHash)
×
NEW
631

×
NEW
632
        return nil
×
633
}
634

635
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
636
// and policies from the given row returned by the SQL query
637
// GetChannelBySCIDWithPolicies.
638
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
639
        row sqlc.GetChannelBySCIDWithPoliciesRow,
640
        chain chainhash.Hash) (*models.ChannelEdgeInfo,
641
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
642

×
643
        node1, node2, err := buildNodeVertices(
×
644
                row.Node.PubKey, row.Node_2.PubKey,
×
645
        )
×
646
        if err != nil {
×
647
                return nil, nil, nil, err
×
648
        }
×
649

650
        edge, err := getAndBuildEdgeInfo(
×
651
                ctx, db, chain, row.Channel.ID, row.Channel, node1, node2,
×
652
        )
×
653
        if err != nil {
×
654
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
655
                        "info: %w", err)
×
656
        }
×
657

658
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
659
        if err != nil {
×
660
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
661
                        "policies: %w", err)
×
662
        }
×
663

664
        policy1, policy2, err := getAndBuildChanPolicies(
×
665
                ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
×
666
        )
×
667
        if err != nil {
×
668
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
669
                        "policies: %w", err)
×
670
        }
×
671

672
        return edge, policy1, policy2, nil
×
673
}
674

675
// forEachPruneLogEntry iterates over each prune log entry in the KV
676
// backend and calls the provided callback function for each entry.
677
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
NEW
678
        hash *chainhash.Hash) error) error {
×
NEW
679

×
NEW
680
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
681
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
NEW
682
                if metaBucket == nil {
×
NEW
683
                        return ErrGraphNotFound
×
NEW
684
                }
×
685

NEW
686
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
NEW
687
                if pruneBucket == nil {
×
NEW
688
                        // The graph has never been pruned and so, there are no
×
NEW
689
                        // entries to iterate over.
×
NEW
690
                        return nil
×
NEW
691
                }
×
692

NEW
693
                return pruneBucket.ForEach(func(k, v []byte) error {
×
NEW
694
                        blockHeight := byteOrder.Uint32(k)
×
NEW
695
                        var blockHash chainhash.Hash
×
NEW
696
                        copy(blockHash[:], v)
×
NEW
697

×
NEW
698
                        return cb(blockHeight, &blockHash)
×
NEW
699
                })
×
NEW
700
        }, func() {})
×
701
}
702

703
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
704
// the SQL database. It iterates over each closed SCID in the KV store, inserts
705
// it into the SQL database, and then verifies that the SCID was inserted
706
// correctly by checking if the channel with the given SCID is seen as closed in
707
// the SQL database.
708
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
709
        sqlDB SQLQueries) error {
×
NEW
710

×
NEW
711
        var count uint64
×
NEW
712
        migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
×
NEW
713
                count++
×
NEW
714

×
NEW
715
                chanIDB := channelIDToBytes(scid.ToUint64())
×
NEW
716
                err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
NEW
717
                if err != nil {
×
NEW
718
                        return fmt.Errorf("could not insert closed channel "+
×
NEW
719
                                "with SCID %s: %w", scid, err)
×
NEW
720
                }
×
721

722
                // Now, verify that the channel with the given SCID is
723
                // seen as closed.
NEW
724
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
725
                if err != nil {
×
NEW
726
                        return fmt.Errorf("could not check if channel %s "+
×
NEW
727
                                "is closed: %w", scid, err)
×
NEW
728
                }
×
729

NEW
730
                if !isClosed {
×
NEW
731
                        return fmt.Errorf("channel %s should be closed, "+
×
NEW
732
                                "but is not", scid)
×
NEW
733
                }
×
734

NEW
735
                return nil
×
736
        }
737

NEW
738
        err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID)
×
NEW
739
        if err != nil {
×
NEW
740
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
741
                        err)
×
NEW
742
        }
×
743

NEW
744
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
NEW
745

×
NEW
746
        return nil
×
747
}
748

749
// migrateZombieIndex migrates the zombie index from the KV backend to
750
// the SQL database. It iterates over each zombie channel in the KV store,
751
// inserts it into the SQL database, and then verifies that the channel is
752
// indeed marked as a zombie channel in the SQL database.
753
//
754
// NOTE: before inserting an entry into the zombie index, the function checks
755
// if the channel is already marked as closed in the SQL store. If it is,
756
// the entry is skipped. This means that the resulting zombie index count in
757
// the SQL store may well be less than the count of zombie channels in the KV
758
// store.
759
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
760
        sqlDB SQLQueries) error {
×
NEW
761

×
NEW
762
        var count uint64
×
NEW
763
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
NEW
764
                pubKey2 [33]byte) error {
×
NEW
765

×
NEW
766
                chanIDB := channelIDToBytes(chanID)
×
NEW
767

×
NEW
768
                // If it is in the closed SCID index, we don't need to
×
NEW
769
                // add it to the zombie index.
×
NEW
770
                //
×
NEW
771
                // NOTE: this means that the resulting zombie index count in
×
NEW
772
                // the SQL store may well be less than the count of zombie
×
NEW
773
                // channels in the KV store.
×
NEW
774
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
775
                if err != nil {
×
NEW
776
                        return fmt.Errorf("could not check closed "+
×
NEW
777
                                "channel: %w", err)
×
NEW
778
                }
×
NEW
779
                if isClosed {
×
NEW
780
                        return nil
×
NEW
781
                }
×
782

NEW
783
                count++
×
NEW
784

×
NEW
785
                err = sqlDB.UpsertZombieChannel(
×
NEW
786
                        ctx, sqlc.UpsertZombieChannelParams{
×
NEW
787
                                Version:  int16(ProtocolV1),
×
NEW
788
                                Scid:     chanIDB,
×
NEW
789
                                NodeKey1: pubKey1[:],
×
NEW
790
                                NodeKey2: pubKey2[:],
×
NEW
791
                        },
×
NEW
792
                )
×
NEW
793
                if err != nil {
×
NEW
794
                        return fmt.Errorf("could not upsert zombie "+
×
NEW
795
                                "channel %d: %w", chanID, err)
×
NEW
796
                }
×
797

798
                // Finally, verify that the channel is indeed marked as a
799
                // zombie channel.
NEW
800
                isZombie, err := sqlDB.IsZombieChannel(
×
NEW
801
                        ctx, sqlc.IsZombieChannelParams{
×
NEW
802
                                Version: int16(ProtocolV1),
×
NEW
803
                                Scid:    chanIDB,
×
NEW
804
                        },
×
NEW
805
                )
×
NEW
806
                if err != nil {
×
NEW
807
                        return fmt.Errorf("could not check if "+
×
NEW
808
                                "channel %d is zombie: %w", chanID, err)
×
NEW
809
                }
×
810

NEW
811
                if !isZombie {
×
NEW
812
                        return fmt.Errorf("channel %d should be "+
×
NEW
813
                                "a zombie, but is not", chanID)
×
NEW
814
                }
×
815

NEW
816
                return nil
×
817
        })
NEW
818
        if err != nil {
×
NEW
819
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
820
        }
×
821

NEW
822
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
NEW
823

×
NEW
824
        return nil
×
825
}
826

827
// forEachZombieEntry iterates over each zombie channel entry in the
828
// KV backend and calls the provided callback function for each entry.
829
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
NEW
830
        pubKey2 [33]byte) error) error {
×
NEW
831

×
NEW
832
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
833
                edges := tx.ReadBucket(edgeBucket)
×
NEW
834
                if edges == nil {
×
NEW
835
                        return ErrGraphNoEdgesFound
×
NEW
836
                }
×
NEW
837
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
NEW
838
                if zombieIndex == nil {
×
NEW
839
                        return nil
×
NEW
840
                }
×
841

NEW
842
                return zombieIndex.ForEach(func(k, v []byte) error {
×
NEW
843
                        var pubKey1, pubKey2 [33]byte
×
NEW
844
                        copy(pubKey1[:], v[:33])
×
NEW
845
                        copy(pubKey2[:], v[33:])
×
NEW
846

×
NEW
847
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
NEW
848
                })
×
NEW
849
        }, func() {})
×
850
}
851

852
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
853
// the provided callback function for each SCID.
854
func forEachClosedSCID(db kvdb.Backend,
NEW
855
        cb func(lnwire.ShortChannelID) error) error {
×
NEW
856

×
NEW
857
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
858
                closedScids := tx.ReadBucket(closedScidBucket)
×
NEW
859
                if closedScids == nil {
×
NEW
860
                        return nil
×
NEW
861
                }
×
862

NEW
863
                return closedScids.ForEach(func(k, _ []byte) error {
×
NEW
864
                        return cb(lnwire.NewShortChanIDFromInt(
×
NEW
865
                                byteOrder.Uint64(k),
×
NEW
866
                        ))
×
NEW
867
                })
×
NEW
868
        }, func() {})
×
869
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc