• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16023352958

02 Jul 2025 11:02AM UTC coverage: 57.603% (-0.2%) from 57.803%
16023352958

Pull #10025

github

web-flow
Merge d7fd9e180 into 1d2e5472b
Pull Request #10025: [draft] graph/db: kvdb -> SQL migration

15 of 608 new or added lines in 8 files covered. (2.47%)

71 existing lines in 13 files now uncovered.

98475 of 170954 relevant lines covered (57.6%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "database/sql"
7
        "errors"
8
        "fmt"
9
        "reflect"
10
        "sort"
11
        "time"
12

13
        "github.com/btcsuite/btcd/chaincfg/chainhash"
14
        "github.com/davecgh/go-spew/spew"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
19
        "github.com/pmezard/go-difflib/difflib"
20
)
21

22
// ErrMigrationMismatch is returned when a migrated graph record does not match
23
// the original record.
24
var ErrMigrationMismatch = fmt.Errorf("migrated graph record does not match " +
25
        "original record")
26

27
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
28
// backend.
29
//
30
// NOTE: this is currently not called from any code path. It is called via tests
31
// only for now and will be called from the main lnd binary once the
32
// migration is fully implemented and tested.
33
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
NEW
34
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
NEW
35

×
NEW
36
        log.Infof("Starting migration of the graph store from KV to SQL")
×
NEW
37
        t0 := time.Now()
×
NEW
38

×
NEW
39
        // Check if there is a graph to migrate.
×
NEW
40
        graphExists, err := checkGraphExists(kvBackend)
×
NEW
41
        if err != nil {
×
NEW
42
                return fmt.Errorf("failed to check graph existence: %w", err)
×
NEW
43
        }
×
NEW
44
        if !graphExists {
×
NEW
45
                log.Infof("No graph found in KV store, skipping the migration")
×
NEW
46
                return nil
×
NEW
47
        }
×
48

49
        // 1) Migrate all the nodes.
NEW
50
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
NEW
51
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
52
        }
×
53

54
        // 2) Migrate the source node.
NEW
55
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
NEW
56
                return fmt.Errorf("could not migrate source node: %w", err)
×
NEW
57
        }
×
58

59
        // 3) Migrate all the channels and channel policies.
NEW
60
        err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
×
NEW
61
        if err != nil {
×
NEW
62
                return fmt.Errorf("could not migrate channels and policies: %w",
×
NEW
63
                        err)
×
NEW
64
        }
×
65

66
        // 4) Migrate the Prune log.
NEW
67
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
NEW
68
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
69
        }
×
70

71
        // 5) Migrate the closed SCID index.
NEW
72
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
NEW
73
        if err != nil {
×
NEW
74
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
75
                        err)
×
NEW
76
        }
×
77

78
        // 6) Migrate the zombie index.
NEW
79
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
NEW
80
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
81
        }
×
82

NEW
83
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
NEW
84
                time.Since(t0))
×
NEW
85

×
NEW
86
        return nil
×
87
}
88

89
// checkGraphExists checks if the graph exists in the KV backend.
NEW
90
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
NEW
91
        // Check if there is even a graph to migrate.
×
NEW
92
        err := db.View(func(tx kvdb.RTx) error {
×
NEW
93
                // Check for the existence of the node bucket which is a top
×
NEW
94
                // level bucket that would have been created on the initial
×
NEW
95
                // creation of the graph store.
×
NEW
96
                nodes := tx.ReadBucket(nodeBucket)
×
NEW
97
                if nodes == nil {
×
NEW
98
                        return ErrGraphNotFound
×
NEW
99
                }
×
100

NEW
101
                return nil
×
NEW
102
        }, func() {})
×
NEW
103
        if errors.Is(err, ErrGraphNotFound) {
×
NEW
104
                return false, nil
×
NEW
105
        } else if err != nil {
×
NEW
106
                return false, fmt.Errorf("failed to check graph existence: %w",
×
NEW
107
                        err)
×
NEW
108
        }
×
109

NEW
110
        return true, nil
×
111
}
112

113
// migrateNodes migrates all nodes from the KV backend to the SQL database.
114
// This includes doing a sanity check after each migration to ensure that the
115
// migrated node matches the original node.
116
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
NEW
117
        sqlDB SQLQueries) error {
×
NEW
118

×
NEW
119
        // Loop through each node in the KV store and insert it into the SQL
×
NEW
120
        // database.
×
NEW
121
        var count uint64
×
NEW
122
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
NEW
123
                node *models.LightningNode) error {
×
NEW
124

×
NEW
125
                count++
×
NEW
126

×
NEW
127
                // Write the node to the SQL database.
×
NEW
128
                id, err := upsertNode(ctx, sqlDB, node)
×
NEW
129
                if err != nil {
×
NEW
130
                        return fmt.Errorf("could not persist node(%x): %w",
×
NEW
131
                                node.PubKeyBytes, err)
×
NEW
132
                }
×
133

134
                // Fetch it from the SQL store and compare it against the
135
                // original node object to ensure the migration was successful.
NEW
136
                dbNode, err := sqlDB.GetNodeByPubKey(
×
NEW
137
                        ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
138
                                PubKey:  node.PubKeyBytes[:],
×
NEW
139
                                Version: int16(ProtocolV1),
×
NEW
140
                        },
×
NEW
141
                )
×
NEW
142
                if err != nil {
×
NEW
143
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
NEW
144
                                "after migration: %w", node.PubKeyBytes, err)
×
NEW
145
                }
×
146

147
                // Sanity check: ensure the migrated node ID matches the one we
148
                // just inserted.
NEW
149
                if dbNode.ID != id {
×
NEW
150
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
NEW
151
                                "after migration: expected %d, got %d",
×
NEW
152
                                node.PubKeyBytes, id, dbNode.ID)
×
NEW
153
                }
×
154

NEW
155
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
NEW
156
                if err != nil {
×
NEW
157
                        return fmt.Errorf("could not build migrated node "+
×
NEW
158
                                "from dbNode: %w", err)
×
NEW
159
                }
×
160

161
                // Make sure that the node addresses are sorted before
162
                // comparing them to ensure that the order of addresses does
163
                // not affect the comparison.
NEW
164
                sort.Slice(node.Addresses, func(i, j int) bool {
×
NEW
165
                        return node.Addresses[i].String() <
×
NEW
166
                                node.Addresses[j].String()
×
NEW
167
                })
×
NEW
168
                sort.Slice(migratedNode.Addresses, func(i, j int) bool {
×
NEW
169
                        return migratedNode.Addresses[i].String() <
×
NEW
170
                                migratedNode.Addresses[j].String()
×
NEW
171
                })
×
172

NEW
173
                return compare(
×
NEW
174
                        node, migratedNode,
×
NEW
175
                        fmt.Sprintf("node %x", node.PubKeyBytes),
×
NEW
176
                )
×
177
        })
NEW
178
        if err != nil {
×
NEW
179
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
180
        }
×
181

NEW
182
        log.Infof("Migrated %d nodes from KV to SQL", count)
×
NEW
183

×
NEW
184
        return nil
×
185
}
186

187
// migrateSourceNode migrates the source node from the KV backend to the
188
// SQL database.
189
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
NEW
190
        sqlDB SQLQueries) error {
×
NEW
191

×
NEW
192
        sourceNode, err := sourceNode(kvdb)
×
NEW
193
        if errors.Is(err, ErrSourceNodeNotSet) {
×
NEW
194
                // If the source node has not been set yet, we can skip this
×
NEW
195
                // migration step.
×
NEW
196
                return nil
×
NEW
197
        } else if err != nil {
×
NEW
198
                return fmt.Errorf("could not get source node from kv "+
×
NEW
199
                        "store: %w", err)
×
NEW
200
        }
×
201

202
        // Get the DB ID of the source node by its public key. This node must
203
        // already exist in the SQL database, as it should have been migrated
204
        // in the previous node-migration step.
NEW
205
        id, err := sqlDB.GetNodeIDByPubKey(
×
NEW
206
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
NEW
207
                        PubKey:  sourceNode.PubKeyBytes[:],
×
NEW
208
                        Version: int16(ProtocolV1),
×
NEW
209
                },
×
NEW
210
        )
×
NEW
211
        if err != nil {
×
NEW
212
                return fmt.Errorf("could not get source node ID: %w", err)
×
NEW
213
        }
×
214

215
        // Now we can add the source node to the SQL database.
NEW
216
        err = sqlDB.AddSourceNode(ctx, id)
×
NEW
217
        if err != nil {
×
NEW
218
                return fmt.Errorf("could not add source node to SQL store: %w",
×
NEW
219
                        err)
×
NEW
220
        }
×
221

222
        // Verify that the source node was added correctly by fetching it back
223
        // from the SQL database and checking that the expected DB ID and
224
        // pub key are returned. We don't need to do a whole node comparison
225
        // here, as this was already done in the previous migration step.
NEW
226
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
NEW
227
        if err != nil {
×
NEW
228
                return fmt.Errorf("could not get source nodes from SQL "+
×
NEW
229
                        "store: %w", err)
×
NEW
230
        }
×
231

232
        // The SQL store has support for multiple source nodes (for future
233
        // protocol versions) but this migration is purely aimed at the V1
234
        // store, and so we expect exactly one source node to be present.
NEW
235
        if len(srcNodes) != 1 {
×
NEW
236
                return fmt.Errorf("expected exactly one source node, "+
×
NEW
237
                        "got %d", len(srcNodes))
×
NEW
238
        }
×
239

240
        // Check that the source node ID and pub key match the original
241
        // source node.
NEW
242
        if srcNodes[0].NodeID != id {
×
NEW
243
                return fmt.Errorf("source node ID mismatch after migration: "+
×
NEW
244
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
NEW
245
        }
×
NEW
246
        err = compare(
×
NEW
247
                sourceNode.PubKeyBytes[:], srcNodes[0].PubKey, "source node",
×
NEW
248
        )
×
NEW
249
        if err != nil {
×
NEW
250
                return fmt.Errorf("source node pubkey mismatch after "+
×
NEW
251
                        "migration: %w", err)
×
NEW
252
        }
×
253

NEW
254
        log.Infof("Migrated source node with pubkey %x to SQL",
×
NEW
255
                sourceNode.PubKeyBytes)
×
NEW
256

×
NEW
257
        return nil
×
258
}
259

260
// migrateChannelsAndPolicies migrates all channels and their policies
261
// from the KV backend to the SQL database.
262
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
NEW
263
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
NEW
264

×
NEW
265
        var (
×
NEW
266
                channelCount uint64
×
NEW
267
                policyCount  uint64
×
NEW
268
        )
×
NEW
269
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
NEW
270
                // If the policy is nil, we can skip it.
×
NEW
271
                if policy == nil {
×
NEW
272
                        return nil
×
NEW
273
                }
×
274

NEW
275
                policyCount++
×
NEW
276

×
NEW
277
                _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
×
NEW
278
                if err != nil {
×
NEW
279
                        return fmt.Errorf("could not migrate channel "+
×
NEW
280
                                "policy %d: %w", policy.ChannelID, err)
×
NEW
281
                }
×
282

NEW
283
                return nil
×
284
        }
285

NEW
286
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
NEW
287
                policy1 *models.ChannelEdgePolicy,
×
NEW
288
                policy2 *models.ChannelEdgePolicy) error {
×
NEW
289

×
NEW
290
                // Here, we do a sanity check to ensure that the chain hash of
×
NEW
291
                // the channel returned by the KV store matches the expected
×
NEW
292
                // chain hash. This is important since in the SQL store, we will
×
NEW
293
                // no longer explicitly store the chain hash in the channel
×
NEW
294
                // info, but rather rely on the chain hash LND is running with.
×
NEW
295
                // So this is our way of ensuring that LND is running on the
×
NEW
296
                // correct network at migration time.
×
NEW
297
                if channel.ChainHash != chain {
×
NEW
298
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
NEW
299
                                "expected %s", channel.ChannelID,
×
NEW
300
                                channel.ChainHash, chain)
×
NEW
301
                }
×
302

NEW
303
                channelCount++
×
NEW
304
                err := migrateSingleChannel(
×
NEW
305
                        ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
×
NEW
306
                )
×
NEW
307
                if err != nil {
×
NEW
308
                        return fmt.Errorf("could not migrate channel %d: %w",
×
NEW
309
                                channel.ChannelID, err)
×
NEW
310
                }
×
311

NEW
312
                return nil
×
313
        })
NEW
314
        if err != nil {
×
NEW
315
                return fmt.Errorf("could not migrate channels and policies: %w",
×
NEW
316
                        err)
×
NEW
317
        }
×
318

NEW
319
        log.Infof("Migrated %d channels and %d policies from KV to SQL",
×
NEW
320
                channelCount, policyCount)
×
NEW
321

×
NEW
322
        return nil
×
323
}
324

325
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
326
        channel *models.ChannelEdgeInfo,
327
        policy1, policy2 *models.ChannelEdgePolicy,
NEW
328
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
NEW
329

×
NEW
330
        scid := channel.ChannelID
×
NEW
331

×
NEW
332
        // First, migrate the channel info along with its policies.
×
NEW
333
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
NEW
334
        if err != nil {
×
NEW
335
                return fmt.Errorf("could not insert record for channel %d "+
×
NEW
336
                        "in SQL store: %w", scid, err)
×
NEW
337
        }
×
338

339
        // Now, migrate the two channel policies.
NEW
340
        err = migChanPolicy(policy1)
×
NEW
341
        if err != nil {
×
NEW
342
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
NEW
343
                        err)
×
NEW
344
        }
×
NEW
345
        err = migChanPolicy(policy2)
×
NEW
346
        if err != nil {
×
NEW
347
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
NEW
348
                        err)
×
NEW
349
        }
×
350

351
        // Now, fetch the channel and its policies from the SQL DB.
NEW
352
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
NEW
353
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
NEW
354
                        Scid:    channelIDToBytes(scid),
×
NEW
355
                        Version: int16(ProtocolV1),
×
NEW
356
                },
×
NEW
357
        )
×
NEW
358
        if err != nil {
×
NEW
359
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
NEW
360
                        err)
×
NEW
361
        }
×
362

363
        // Assert that the DB IDs for the channel and nodes are as expected
364
        // given the inserted channel info.
NEW
365
        err = compare(dbChanInfo.channelID, row.Channel.ID, "channel DB ID")
×
NEW
366
        if err != nil {
×
NEW
367
                return err
×
NEW
368
        }
×
NEW
369
        err = compare(dbChanInfo.node1ID, row.Node.ID, "node1 DB ID")
×
NEW
370
        if err != nil {
×
NEW
371
                return err
×
NEW
372
        }
×
NEW
373
        err = compare(dbChanInfo.node2ID, row.Node_2.ID, "node2 DB ID")
×
NEW
374
        if err != nil {
×
NEW
375
                return err
×
NEW
376
        }
×
377

NEW
378
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
NEW
379
                ctx, sqlDB, row, channel.ChainHash,
×
NEW
380
        )
×
NEW
381
        if err != nil {
×
NEW
382
                return fmt.Errorf("could not build migrated channel and "+
×
NEW
383
                        "policies: %w", err)
×
NEW
384
        }
×
385

386
        // Finally, compare the original channel info and
387
        // policies with the migrated ones to ensure they match.
NEW
388
        if len(channel.ExtraOpaqueData) == 0 {
×
NEW
389
                channel.ExtraOpaqueData = nil
×
NEW
390
        }
×
NEW
391
        if len(migChan.ExtraOpaqueData) == 0 {
×
NEW
392
                migChan.ExtraOpaqueData = nil
×
NEW
393
        }
×
394

NEW
395
        err = compare(channel, migChan, fmt.Sprintf("channel %d", scid))
×
NEW
396
        if err != nil {
×
NEW
397
                return err
×
NEW
398
        }
×
399

NEW
400
        checkPolicy := func(expPolicy,
×
NEW
401
                migPolicy *models.ChannelEdgePolicy) error {
×
NEW
402

×
NEW
403
                switch {
×
404
                // Both policies are nil, nothing to compare.
NEW
405
                case expPolicy == nil && migPolicy == nil:
×
NEW
406
                        return nil
×
407

408
                // One of the policies is nil, but the other is not.
NEW
409
                case expPolicy == nil || migPolicy == nil:
×
NEW
410
                        return fmt.Errorf("expected both policies to be "+
×
NEW
411
                                "non-nil. Got expPolicy: %v, "+
×
NEW
412
                                "migPolicy: %v", expPolicy, migPolicy)
×
413

414
                // Both policies are non-nil, we can compare them.
NEW
415
                default:
×
416
                }
417

NEW
418
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
NEW
419
                        expPolicy.ExtraOpaqueData = nil
×
NEW
420
                }
×
NEW
421
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
NEW
422
                        migPolicy.ExtraOpaqueData = nil
×
NEW
423
                }
×
424

NEW
425
                return compare(
×
NEW
426
                        *expPolicy, *migPolicy, fmt.Sprintf("channel policy"),
×
NEW
427
                )
×
428

429
        }
430

NEW
431
        err = checkPolicy(policy1, migPol1)
×
NEW
432
        if err != nil {
×
NEW
433
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
NEW
434
                        err)
×
NEW
435
        }
×
436

NEW
437
        err = checkPolicy(policy2, migPol2)
×
NEW
438
        if err != nil {
×
NEW
439
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
NEW
440
                        err)
×
NEW
441
        }
×
442

NEW
443
        return nil
×
444
}
445

446
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
NEW
447
        sqlDB SQLQueries) error {
×
NEW
448

×
NEW
449
        var (
×
NEW
450
                count          uint64
×
NEW
451
                pruneTipHeight uint32
×
NEW
452
                pruneTipHash   chainhash.Hash
×
NEW
453
        )
×
NEW
454

×
NEW
455
        migrateSinglePruneEntry := func(height uint32,
×
NEW
456
                hash *chainhash.Hash) error {
×
NEW
457

×
NEW
458
                count++
×
NEW
459

×
NEW
460
                if height > pruneTipHeight {
×
NEW
461
                        pruneTipHeight = height
×
NEW
462
                        pruneTipHash = *hash
×
NEW
463
                }
×
464

NEW
465
                err := sqlDB.UpsertPruneLogEntry(
×
NEW
466
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
NEW
467
                                BlockHeight: int64(height),
×
NEW
468
                                BlockHash:   hash[:],
×
NEW
469
                        },
×
NEW
470
                )
×
NEW
471
                if err != nil {
×
NEW
472
                        return fmt.Errorf("unable to insert prune log "+
×
NEW
473
                                "entry: %w", err)
×
NEW
474
                }
×
475

476
                // Now, check that the entry was inserted correctly.
NEW
477
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
NEW
478
                        ctx, int64(height),
×
NEW
479
                )
×
NEW
480
                if err != nil {
×
NEW
481
                        return fmt.Errorf("could not get prune hash "+
×
NEW
482
                                "by height: %w", err)
×
NEW
483
                }
×
484

NEW
485
                return compare(
×
NEW
486
                        hash[:], migratedHash, fmt.Sprintf("prune log entry"),
×
NEW
487
                )
×
488
        }
489

NEW
490
        err := forEachPruneLogEntry(kvBackend,
×
NEW
491
                func(height uint32, hash *chainhash.Hash) error {
×
NEW
492
                        err := migrateSinglePruneEntry(height, hash)
×
NEW
493
                        if err != nil {
×
NEW
494
                                return fmt.Errorf("could not migrate "+
×
NEW
495
                                        "prune log entry at height %d: %w",
×
NEW
496
                                        height, err)
×
NEW
497
                        }
×
498

NEW
499
                        return nil
×
500
                },
501
        )
NEW
502
        if err != nil {
×
NEW
503
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
504
        }
×
505

506
        // Check that the prune tip is set correctly in the SQL
507
        // database.
NEW
508
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
NEW
509
        if errors.Is(err, sql.ErrNoRows) {
×
NEW
510
                // The ErrGraphNeverPruned error is expected if no prune log
×
NEW
511
                // entries were migrated from the kvdb store. Otherwise, it's
×
NEW
512
                // an unexpected error.
×
NEW
513
                if count == 0 {
×
NEW
514
                        log.Infof("No prune log entries found in KV store " +
×
NEW
515
                                "to migrate")
×
NEW
516
                        return nil
×
NEW
517
                }
×
518
                // Fall-through to the next error check.
519
        }
NEW
520
        if err != nil {
×
NEW
521
                return fmt.Errorf("could not get prune tip: %w", err)
×
NEW
522
        }
×
523

NEW
524
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
NEW
525
                !bytes.Equal(pruneTip.BlockHash[:], pruneTipHash[:]) {
×
NEW
526

×
NEW
527
                return fmt.Errorf("prune tip mismatch after migration: "+
×
NEW
528
                        "expected height %d, hash %s; got height %d, "+
×
NEW
529
                        "hash %s", pruneTipHeight, pruneTipHash,
×
NEW
530
                        pruneTip.BlockHeight,
×
NEW
531
                        chainhash.Hash(pruneTip.BlockHash))
×
NEW
532
        }
×
533

NEW
534
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
NEW
535
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
NEW
536
                pruneTipHash)
×
NEW
537

×
NEW
538
        return nil
×
539
}
540

541
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
542
        sqlDB SQLQueries) error {
×
NEW
543

×
NEW
544
        var count uint64
×
NEW
545
        err := forEachClosedSCID(kvBackend,
×
NEW
546
                func(scid lnwire.ShortChannelID) error {
×
NEW
547
                        count++
×
NEW
548

×
NEW
549
                        chanIDB := channelIDToBytes(scid.ToUint64())
×
NEW
550
                        err := sqlDB.InsertClosedChannel(ctx, chanIDB[:])
×
NEW
551
                        if err != nil {
×
NEW
552
                                return fmt.Errorf("could not insert closed "+
×
NEW
553
                                        "channel with SCID %s: %w", scid, err)
×
NEW
554
                        }
×
555

556
                        // Now, verify that the channel with the given SCID is
557
                        // seen as closed.
NEW
558
                        isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
559
                        if err != nil {
×
NEW
560
                                return fmt.Errorf("could not check if "+
×
NEW
561
                                        "channel %s is closed: %w", scid, err)
×
NEW
562
                        }
×
563

NEW
564
                        if !isClosed {
×
NEW
565
                                return fmt.Errorf("channel %s should be "+
×
NEW
566
                                        "closed, but is not", scid)
×
NEW
567
                        }
×
568

NEW
569
                        return nil
×
570
                },
571
        )
NEW
572
        if err != nil {
×
NEW
573
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
574
                        err)
×
NEW
575
        }
×
576

NEW
577
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
NEW
578

×
NEW
579
        return nil
×
580
}
581

582
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
583
        sqlDB SQLQueries) error {
×
NEW
584

×
NEW
585
        var count uint64
×
NEW
586
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
NEW
587
                pubKey2 [33]byte) error {
×
NEW
588

×
NEW
589
                chanIDB := channelIDToBytes(chanID)
×
NEW
590

×
NEW
591
                // If it is in the closed SCID index, we don't need to
×
NEW
592
                // add it to the zombie index.
×
NEW
593
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
594
                if err != nil {
×
NEW
595
                        return fmt.Errorf("could not check closed "+
×
NEW
596
                                "channel: %w", err)
×
NEW
597
                }
×
598

NEW
599
                if isClosed {
×
NEW
600
                        return nil
×
NEW
601
                }
×
602

NEW
603
                count++
×
NEW
604

×
NEW
605
                err = sqlDB.UpsertZombieChannel(
×
NEW
606
                        ctx, sqlc.UpsertZombieChannelParams{
×
NEW
607
                                Version:  int16(ProtocolV1),
×
NEW
608
                                Scid:     chanIDB[:],
×
NEW
609
                                NodeKey1: pubKey1[:],
×
NEW
610
                                NodeKey2: pubKey2[:],
×
NEW
611
                        },
×
NEW
612
                )
×
NEW
613
                if err != nil {
×
NEW
614
                        return fmt.Errorf("could not upsert zombie "+
×
NEW
615
                                "channel %d: %w", chanID, err)
×
NEW
616
                }
×
617

618
                // Finally, verify that the channel is indeed marked as a
619
                // zombie channel.
NEW
620
                isZombie, err := sqlDB.IsZombieChannel(
×
NEW
621
                        ctx, sqlc.IsZombieChannelParams{
×
NEW
622
                                Version: int16(ProtocolV1),
×
NEW
623
                                Scid:    chanIDB,
×
NEW
624
                        },
×
NEW
625
                )
×
NEW
626
                if err != nil {
×
NEW
627
                        return fmt.Errorf("could not check if "+
×
NEW
628
                                "channel %d is zombie: %w", chanID, err)
×
NEW
629
                }
×
630

NEW
631
                if !isZombie {
×
NEW
632
                        return fmt.Errorf("channel %d should be "+
×
NEW
633
                                "a zombie, but is not", chanID)
×
NEW
634
                }
×
635

NEW
636
                return nil
×
637
        })
NEW
638
        if err != nil {
×
NEW
639
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
640
        }
×
641

NEW
642
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
NEW
643

×
NEW
644
        return nil
×
645
}
646

647
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
NEW
648
        pubKey2 [33]byte) error) error {
×
NEW
649

×
NEW
650
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
651
                edges := tx.ReadBucket(edgeBucket)
×
NEW
652
                if edges == nil {
×
NEW
653
                        return ErrGraphNoEdgesFound
×
NEW
654
                }
×
NEW
655
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
NEW
656
                if zombieIndex == nil {
×
NEW
657
                        return nil
×
NEW
658
                }
×
659

NEW
660
                return zombieIndex.ForEach(func(k, v []byte) error {
×
NEW
661
                        var pubKey1, pubKey2 [33]byte
×
NEW
662
                        copy(pubKey1[:], v[:33])
×
NEW
663
                        copy(pubKey2[:], v[33:])
×
NEW
664

×
NEW
665
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
NEW
666
                })
×
NEW
667
        }, func() {})
×
668
}
669

670
func forEachClosedSCID(db kvdb.Backend,
NEW
671
        cb func(lnwire.ShortChannelID) error) error {
×
NEW
672

×
NEW
673
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
674
                closedScids := tx.ReadBucket(closedScidBucket)
×
NEW
675
                if closedScids == nil {
×
NEW
676
                        return nil
×
NEW
677
                }
×
678

NEW
679
                return closedScids.ForEach(func(k, _ []byte) error {
×
NEW
680
                        return cb(lnwire.NewShortChanIDFromInt(
×
NEW
681
                                byteOrder.Uint64(k),
×
NEW
682
                        ))
×
NEW
683
                })
×
NEW
684
        }, func() {})
×
685
}
686

687
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
688
        row sqlc.GetChannelBySCIDWithPoliciesRow,
689
        chain chainhash.Hash) (*models.ChannelEdgeInfo,
NEW
690
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
NEW
691

×
NEW
692
        node1, node2, err := buildNodeVertices(
×
NEW
693
                row.Node.PubKey, row.Node_2.PubKey,
×
NEW
694
        )
×
NEW
695
        if err != nil {
×
NEW
696
                return nil, nil, nil, err
×
NEW
697
        }
×
698

NEW
699
        edge, err := getAndBuildEdgeInfo(
×
NEW
700
                ctx, db, chain, row.Channel.ID, row.Channel, node1, node2,
×
NEW
701
        )
×
NEW
702
        if err != nil {
×
NEW
703
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
NEW
704
                        "info: %w", err)
×
NEW
705
        }
×
706

NEW
707
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
NEW
708
        if err != nil {
×
NEW
709
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
NEW
710
                        "policies: %w", err)
×
NEW
711
        }
×
712

NEW
713
        policy1, policy2, err := getAndBuildChanPolicies(
×
NEW
714
                ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
×
NEW
715
        )
×
NEW
716
        if err != nil {
×
NEW
717
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
NEW
718
                        "policies: %w", err)
×
NEW
719
        }
×
720

NEW
721
        return edge, policy1, policy2, nil
×
722
}
723

724
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
NEW
725
        hash *chainhash.Hash) error) error {
×
NEW
726

×
NEW
727
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
728
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
NEW
729
                if metaBucket == nil {
×
NEW
730
                        return ErrGraphNotFound
×
NEW
731
                }
×
732

NEW
733
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
NEW
734
                if pruneBucket == nil {
×
NEW
735
                        // The graph has never been pruned and so, there are no
×
NEW
736
                        // entries to iterate over.
×
NEW
737
                        return nil
×
NEW
738
                }
×
739

NEW
740
                return pruneBucket.ForEach(func(k, v []byte) error {
×
NEW
741
                        blockHeight := byteOrder.Uint32(k)
×
NEW
742
                        var blockHash chainhash.Hash
×
NEW
743
                        copy(blockHash[:], v)
×
NEW
744

×
NEW
745
                        return cb(blockHeight, &blockHash)
×
NEW
746
                })
×
NEW
747
        }, func() {})
×
748
}
749

750
// compare checks if the original and migrated objects are equal. If they
751
// are not, it returns an error with a unified diff of the two objects.
NEW
752
func compare(original, migrated any, identifier string) error {
×
NEW
753
        if reflect.DeepEqual(original, migrated) {
×
NEW
754
                return nil
×
NEW
755
        }
×
756

NEW
757
        diff := difflib.UnifiedDiff{
×
NEW
758
                A:        difflib.SplitLines(spew.Sdump(original)),
×
NEW
759
                B:        difflib.SplitLines(spew.Sdump(migrated)),
×
NEW
760
                FromFile: "Expected",
×
NEW
761
                FromDate: "",
×
NEW
762
                ToFile:   "Actual",
×
NEW
763
                ToDate:   "",
×
NEW
764
                Context:  3,
×
NEW
765
        }
×
NEW
766
        diffText, _ := difflib.GetUnifiedDiffString(diff)
×
NEW
767

×
NEW
768
        return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier,
×
NEW
769
                diffText)
×
770
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc