• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16073633959

04 Jul 2025 12:19PM UTC coverage: 57.55% (-0.3%) from 57.822%
16073633959

Pull #10025

github

web-flow
Merge eefe1c73d into b3eb9a3cb
Pull Request #10025: [draft] graph/db: kvdb -> SQL migration

7 of 658 new or added lines in 4 files covered. (1.06%)

369 existing lines in 12 files now uncovered.

98434 of 171042 relevant lines covered (57.55%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "database/sql"
7
        "errors"
8
        "fmt"
9
        "reflect"
10
        "sort"
11
        "time"
12

13
        "github.com/btcsuite/btcd/chaincfg/chainhash"
14
        "github.com/davecgh/go-spew/spew"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
19
        "github.com/pmezard/go-difflib/difflib"
20
)
21

22
// ErrMigrationMismatch is returned when a migrated graph record does not match
23
// the original record.
24
var ErrMigrationMismatch = fmt.Errorf("migrated graph record does not match " +
25
        "original record")
26

27
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
28
// backend.
29
//
30
// NOTE: this is currently not called from any code path. It is called via tests
31
// only for now and will be called from the main lnd binary once the
32
// migration is fully implemented and tested.
33
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
NEW
34
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
NEW
35

×
NEW
36
        log.Infof("Starting migration of the graph store from KV to SQL")
×
NEW
37
        t0 := time.Now()
×
NEW
38

×
NEW
39
        // Check if there is a graph to migrate.
×
NEW
40
        graphExists, err := checkGraphExists(kvBackend)
×
NEW
41
        if err != nil {
×
NEW
42
                return fmt.Errorf("failed to check graph existence: %w", err)
×
NEW
43
        }
×
NEW
44
        if !graphExists {
×
NEW
45
                log.Infof("No graph found in KV store, skipping the migration")
×
NEW
46
                return nil
×
NEW
47
        }
×
48

49
        // 1) Migrate all the nodes.
NEW
50
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
NEW
51
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
52
        }
×
53

54
        // 2) Migrate the source node.
NEW
55
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
NEW
56
                return fmt.Errorf("could not migrate source node: %w", err)
×
NEW
57
        }
×
58

59
        // 3) Migrate all the channels and channel policies.
NEW
60
        err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
×
NEW
61
        if err != nil {
×
NEW
62
                return fmt.Errorf("could not migrate channels and policies: %w",
×
NEW
63
                        err)
×
NEW
64
        }
×
65

66
        // 4) Migrate the Prune log.
NEW
67
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
NEW
68
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
69
        }
×
70

71
        // 5) Migrate the closed SCID index.
NEW
72
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
NEW
73
        if err != nil {
×
NEW
74
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
75
                        err)
×
NEW
76
        }
×
77

78
        // 6) Migrate the zombie index.
NEW
79
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
NEW
80
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
81
        }
×
82

NEW
83
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
NEW
84
                time.Since(t0))
×
NEW
85

×
NEW
86
        return nil
×
87
}
88

89
// checkGraphExists checks if the graph exists in the KV backend.
NEW
90
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
NEW
91
        // Check if there is even a graph to migrate.
×
NEW
92
        err := db.View(func(tx kvdb.RTx) error {
×
NEW
93
                // Check for the existence of the node bucket which is a top
×
NEW
94
                // level bucket that would have been created on the initial
×
NEW
95
                // creation of the graph store.
×
NEW
96
                nodes := tx.ReadBucket(nodeBucket)
×
NEW
97
                if nodes == nil {
×
NEW
98
                        return ErrGraphNotFound
×
NEW
99
                }
×
100

NEW
101
                return nil
×
NEW
102
        }, func() {})
×
NEW
103
        if errors.Is(err, ErrGraphNotFound) {
×
NEW
104
                return false, nil
×
NEW
105
        } else if err != nil {
×
NEW
106
                return false, fmt.Errorf("failed to check graph existence: %w",
×
NEW
107
                        err)
×
NEW
108
        }
×
109

NEW
110
        return true, nil
×
111
}
112

113
// migrateNodes migrates all nodes from the KV backend to the SQL database.
114
// This includes doing a sanity check after each migration to ensure that the
115
// migrated node matches the original node.
116
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
NEW
117
        sqlDB SQLQueries) error {
×
NEW
118

×
NEW
119
        // Keep track of the number of nodes migrated and the number of
×
NEW
120
        // nodes skipped due to errors.
×
NEW
121
        var (
×
NEW
122
                count   uint64
×
NEW
123
                skipped uint64
×
NEW
124
        )
×
NEW
125

×
NEW
126
        // Loop through each node in the KV store and insert it into the SQL
×
NEW
127
        // database.
×
NEW
128
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
NEW
129
                node *models.LightningNode) error {
×
NEW
130

×
NEW
131
                pub := node.PubKeyBytes
×
NEW
132

×
NEW
133
                // Sanity check to ensure that the node has valid extra opaque
×
NEW
134
                // data. If it does not, we'll skip it. We need to do this
×
NEW
135
                // because previously we would just persist any TLV bytes that
×
NEW
136
                // we received without validating them. Now, however, we
×
NEW
137
                // normalise the storage of extra opaque data, so we need to
×
NEW
138
                // ensure that the data is valid. We don't want to abort the
×
NEW
139
                // migration if we encounter a node with invalid extra opaque
×
NEW
140
                // data, so we'll just skip it and log a warning.
×
NEW
141
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
NEW
142
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
NEW
143
                        skipped++
×
NEW
144
                        log.Warnf("Skipping migration of node %x with invalid "+
×
NEW
145
                                "extra opaque data: %v", pub,
×
NEW
146
                                node.ExtraOpaqueData)
×
NEW
147

×
NEW
148
                        return nil
×
NEW
149
                } else if err != nil {
×
NEW
150
                        return fmt.Errorf("unable to marshal extra "+
×
NEW
151
                                "opaque data for node %x: %w", pub, err)
×
NEW
152
                }
×
153

NEW
154
                count++
×
NEW
155

×
NEW
156
                // Write the node to the SQL database.
×
NEW
157
                id, err := upsertNode(ctx, sqlDB, node)
×
NEW
158
                if err != nil {
×
NEW
159
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
NEW
160
                                err)
×
NEW
161
                }
×
162

163
                // Fetch it from the SQL store and compare it against the
164
                // original node object to ensure the migration was successful.
NEW
165
                dbNode, err := sqlDB.GetNodeByPubKey(
×
NEW
166
                        ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
167
                                PubKey:  node.PubKeyBytes[:],
×
NEW
168
                                Version: int16(ProtocolV1),
×
NEW
169
                        },
×
NEW
170
                )
×
NEW
171
                if err != nil {
×
NEW
172
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
NEW
173
                                "after migration: %w", pub, err)
×
NEW
174
                }
×
175

176
                // Sanity check: ensure the migrated node ID matches the one we
177
                // just inserted.
NEW
178
                if dbNode.ID != id {
×
NEW
179
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
NEW
180
                                "after migration: expected %d, got %d",
×
NEW
181
                                pub, id, dbNode.ID)
×
NEW
182
                }
×
183

NEW
184
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
NEW
185
                if err != nil {
×
NEW
186
                        return fmt.Errorf("could not build migrated node "+
×
NEW
187
                                "from dbNode(db id: %d, node pub: %x): %w",
×
NEW
188
                                dbNode.ID, pub, err)
×
NEW
189
                }
×
190

191
                // Make sure that the node addresses are sorted before
192
                // comparing them to ensure that the order of addresses does
193
                // not affect the comparison.
NEW
194
                sort.Slice(node.Addresses, func(i, j int) bool {
×
NEW
195
                        return node.Addresses[i].String() <
×
NEW
196
                                node.Addresses[j].String()
×
NEW
197
                })
×
NEW
198
                sort.Slice(migratedNode.Addresses, func(i, j int) bool {
×
NEW
199
                        return migratedNode.Addresses[i].String() <
×
NEW
200
                                migratedNode.Addresses[j].String()
×
NEW
201
                })
×
202

NEW
203
                return compare(node, migratedNode, fmt.Sprintf("node %x", pub))
×
204
        })
NEW
205
        if err != nil {
×
NEW
206
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
207
        }
×
208

NEW
209
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
NEW
210
                "invalid TLV streams)", count, skipped)
×
NEW
211

×
NEW
212
        return nil
×
213
}
214

215
// migrateSourceNode migrates the source node from the KV backend to the
216
// SQL database.
217
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
NEW
218
        sqlDB SQLQueries) error {
×
NEW
219

×
NEW
220
        sourceNode, err := sourceNode(kvdb)
×
NEW
221
        if errors.Is(err, ErrSourceNodeNotSet) {
×
NEW
222
                // If the source node has not been set yet, we can skip this
×
NEW
223
                // migration step.
×
NEW
224
                return nil
×
NEW
225
        } else if err != nil {
×
NEW
226
                return fmt.Errorf("could not get source node from kv "+
×
NEW
227
                        "store: %w", err)
×
NEW
228
        }
×
229

NEW
230
        pub := sourceNode.PubKeyBytes
×
NEW
231

×
NEW
232
        // Get the DB ID of the source node by its public key. This node must
×
NEW
233
        // already exist in the SQL database, as it should have been migrated
×
NEW
234
        // in the previous node-migration step.
×
NEW
235
        id, err := sqlDB.GetNodeIDByPubKey(
×
NEW
236
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
NEW
237
                        PubKey:  pub[:],
×
NEW
238
                        Version: int16(ProtocolV1),
×
NEW
239
                },
×
NEW
240
        )
×
NEW
241
        if err != nil {
×
NEW
242
                return fmt.Errorf("could not get source node ID: %w", err)
×
NEW
243
        }
×
244

245
        // Now we can add the source node to the SQL database.
NEW
246
        err = sqlDB.AddSourceNode(ctx, id)
×
NEW
247
        if err != nil {
×
NEW
248
                return fmt.Errorf("could not add source node to SQL store: %w",
×
NEW
249
                        err)
×
NEW
250
        }
×
251

252
        // Verify that the source node was added correctly by fetching it back
253
        // from the SQL database and checking that the expected DB ID and
254
        // pub key are returned. We don't need to do a whole node comparison
255
        // here, as this was already done in the previous migration step.
NEW
256
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
NEW
257
        if err != nil {
×
NEW
258
                return fmt.Errorf("could not get source nodes from SQL "+
×
NEW
259
                        "store: %w", err)
×
NEW
260
        }
×
261

262
        // The SQL store has support for multiple source nodes (for future
263
        // protocol versions) but this migration is purely aimed at the V1
264
        // store, and so we expect exactly one source node to be present.
NEW
265
        if len(srcNodes) != 1 {
×
NEW
266
                return fmt.Errorf("expected exactly one source node, "+
×
NEW
267
                        "got %d", len(srcNodes))
×
NEW
268
        }
×
269

270
        // Check that the source node ID and pub key match the original
271
        // source node.
NEW
272
        if srcNodes[0].NodeID != id {
×
NEW
273
                return fmt.Errorf("source node ID mismatch after migration: "+
×
NEW
274
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
NEW
275
        }
×
NEW
276
        err = compare(pub[:], srcNodes[0].PubKey, "source node")
×
NEW
277
        if err != nil {
×
NEW
278
                return fmt.Errorf("source node pubkey mismatch after "+
×
NEW
279
                        "migration: %w", err)
×
NEW
280
        }
×
281

NEW
282
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
NEW
283

×
NEW
284
        return nil
×
285
}
286

287
// migrateChannelsAndPolicies migrates all channels and their policies
288
// from the KV backend to the SQL database.
289
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
NEW
290
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
NEW
291

×
NEW
292
        var (
×
NEW
293
                channelCount       uint64
×
NEW
294
                skippedChanCount   uint64
×
NEW
295
                policyCount        uint64
×
NEW
296
                skippedPolicyCount uint64
×
NEW
297
        )
×
NEW
298
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
NEW
299
                // If the policy is nil, we can skip it.
×
NEW
300
                if policy == nil {
×
NEW
301
                        return nil
×
NEW
302
                }
×
303

304
                // Sanity check to ensure that the policy has valid extra opaque
305
                // data. If it does not, we'll skip it. We need to do this
306
                // because previously we would just persist any TLV bytes that
307
                // we received without validating them. Now, however, we
308
                // normalise the storage of extra opaque data, so we need to
309
                // ensure that the data is valid. We don't want to abort the
310
                // migration if we encounter a policy with invalid extra opaque
311
                // data, so we'll just skip it and log a warning.
NEW
312
                _, err := marshalExtraOpaqueData(policy.ExtraOpaqueData)
×
NEW
313
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
NEW
314
                        skippedPolicyCount++
×
NEW
315
                        log.Warnf("Skipping policy for channel %d with "+
×
NEW
316
                                "invalid extra opaque data: %v",
×
NEW
317
                                policy.ChannelID, policy.ExtraOpaqueData)
×
NEW
318

×
NEW
319
                        return nil
×
NEW
320
                } else if err != nil {
×
NEW
321
                        return fmt.Errorf("unable to marshal extra opaque "+
×
NEW
322
                                "data: %w. %+v", err, policy.ExtraOpaqueData)
×
NEW
323
                }
×
324

NEW
325
                policyCount++
×
NEW
326

×
NEW
327
                _, _, _, err = updateChanEdgePolicy(ctx, sqlDB, policy)
×
NEW
328
                if err != nil {
×
NEW
329
                        return fmt.Errorf("could not migrate channel "+
×
NEW
330
                                "policy %d: %w", policy.ChannelID, err)
×
NEW
331
                }
×
332

NEW
333
                return nil
×
334
        }
335

336
        // Iterate over each channel in the KV store and migrate it and its
337
        // policies to the SQL database.
NEW
338
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
NEW
339
                policy1 *models.ChannelEdgePolicy,
×
NEW
340
                policy2 *models.ChannelEdgePolicy) error {
×
NEW
341

×
NEW
342
                scid := channel.ChannelID
×
NEW
343

×
NEW
344
                // Here, we do a sanity check to ensure that the chain hash of
×
NEW
345
                // the channel returned by the KV store matches the expected
×
NEW
346
                // chain hash. This is important since in the SQL store, we will
×
NEW
347
                // no longer explicitly store the chain hash in the channel
×
NEW
348
                // info, but rather rely on the chain hash LND is running with.
×
NEW
349
                // So this is our way of ensuring that LND is running on the
×
NEW
350
                // correct network at migration time.
×
NEW
351
                if channel.ChainHash != chain {
×
NEW
352
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
NEW
353
                                "expected %s", scid, channel.ChainHash, chain)
×
NEW
354
                }
×
355

356
                // Sanity check to ensure that the channel has valid extra
357
                // opaque data. If it does not, we'll skip it. We need to do
358
                // this because previously we would just persist any TLV bytes
359
                // that we received without validating them. Now, however, we
360
                // normalise the storage of extra opaque data, so we need to
361
                // ensure that the data is valid. We don't want to abort the
362
                // migration if we encounter a channel with invalid extra opaque
363
                // data, so we'll just skip it and log a warning.
NEW
364
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
NEW
365
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
NEW
366
                        log.Warnf("Skipping channel %d with invalid "+
×
NEW
367
                                "extra opaque data: %v", scid,
×
NEW
368
                                channel.ExtraOpaqueData)
×
NEW
369

×
NEW
370
                        skippedChanCount++
×
NEW
371

×
NEW
372
                        // If we skip a channel, we also skip its policies.
×
NEW
373
                        if policy1 != nil {
×
NEW
374
                                skippedPolicyCount++
×
NEW
375
                        }
×
NEW
376
                        if policy2 != nil {
×
NEW
377
                                skippedPolicyCount++
×
NEW
378
                        }
×
379

NEW
380
                        return nil
×
NEW
381
                } else if err != nil {
×
NEW
382
                        return fmt.Errorf("unable to marshal extra opaque "+
×
NEW
383
                                "data for channel %d: %w %v", scid, err,
×
NEW
384
                                channel.ExtraOpaqueData)
×
NEW
385
                }
×
386

NEW
387
                channelCount++
×
NEW
388
                err = migrateSingleChannel(
×
NEW
389
                        ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
×
NEW
390
                )
×
NEW
391
                if err != nil {
×
NEW
392
                        return fmt.Errorf("could not migrate channel %d: %w",
×
NEW
393
                                scid, err)
×
NEW
394
                }
×
395

NEW
396
                return nil
×
397
        })
NEW
398
        if err != nil {
×
NEW
399
                return fmt.Errorf("could not migrate channels and policies: %w",
×
NEW
400
                        err)
×
NEW
401
        }
×
402

NEW
403
        log.Infof("Migrated %d channels and %d policies from KV to SQL "+
×
NEW
404
                "(skipped %d channels and %d policies due to invalid TLV "+
×
NEW
405
                "streams)", channelCount, policyCount, skippedChanCount,
×
NEW
406
                skippedPolicyCount)
×
NEW
407

×
NEW
408
        return nil
×
409
}
410

411
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
412
        channel *models.ChannelEdgeInfo,
413
        policy1, policy2 *models.ChannelEdgePolicy,
NEW
414
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
NEW
415

×
NEW
416
        scid := channel.ChannelID
×
NEW
417

×
NEW
418
        // First, migrate the channel info along with its policies.
×
NEW
419
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
NEW
420
        if err != nil {
×
NEW
421
                return fmt.Errorf("could not insert record for channel %d "+
×
NEW
422
                        "in SQL store: %w", scid, err)
×
NEW
423
        }
×
424

425
        // Now, migrate the two channel policies.
NEW
426
        err = migChanPolicy(policy1)
×
NEW
427
        if err != nil {
×
NEW
428
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
NEW
429
                        err)
×
NEW
430
        }
×
NEW
431
        err = migChanPolicy(policy2)
×
NEW
432
        if err != nil {
×
NEW
433
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
NEW
434
                        err)
×
NEW
435
        }
×
436

437
        // Now, fetch the channel and its policies from the SQL DB.
NEW
438
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
NEW
439
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
NEW
440
                        Scid:    channelIDToBytes(scid),
×
NEW
441
                        Version: int16(ProtocolV1),
×
NEW
442
                },
×
NEW
443
        )
×
NEW
444
        if err != nil {
×
NEW
445
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
NEW
446
                        err)
×
NEW
447
        }
×
448

449
        // Assert that the DB IDs for the channel and nodes are as expected
450
        // given the inserted channel info.
NEW
451
        err = compare(dbChanInfo.channelID, row.Channel.ID, "channel DB ID")
×
NEW
452
        if err != nil {
×
NEW
453
                return err
×
NEW
454
        }
×
NEW
455
        err = compare(dbChanInfo.node1ID, row.Node.ID, "node1 DB ID")
×
NEW
456
        if err != nil {
×
NEW
457
                return err
×
NEW
458
        }
×
NEW
459
        err = compare(dbChanInfo.node2ID, row.Node_2.ID, "node2 DB ID")
×
NEW
460
        if err != nil {
×
NEW
461
                return err
×
NEW
462
        }
×
463

NEW
464
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
NEW
465
                ctx, sqlDB, row, channel.ChainHash,
×
NEW
466
        )
×
NEW
467
        if err != nil {
×
NEW
468
                return fmt.Errorf("could not build migrated channel and "+
×
NEW
469
                        "policies: %w", err)
×
NEW
470
        }
×
471

472
        // Finally, compare the original channel info and
473
        // policies with the migrated ones to ensure they match.
NEW
474
        if len(channel.ExtraOpaqueData) == 0 {
×
NEW
475
                channel.ExtraOpaqueData = nil
×
NEW
476
        }
×
NEW
477
        if len(migChan.ExtraOpaqueData) == 0 {
×
NEW
478
                migChan.ExtraOpaqueData = nil
×
NEW
479
        }
×
480

NEW
481
        err = compare(channel, migChan, fmt.Sprintf("channel %d", scid))
×
NEW
482
        if err != nil {
×
NEW
483
                return err
×
NEW
484
        }
×
485

NEW
486
        checkPolicy := func(expPolicy,
×
NEW
487
                migPolicy *models.ChannelEdgePolicy) error {
×
NEW
488

×
NEW
489
                switch {
×
490
                // Both policies are nil, nothing to compare.
NEW
491
                case expPolicy == nil && migPolicy == nil:
×
NEW
492
                        return nil
×
493

494
                // One of the policies is nil, but the other is not.
NEW
495
                case expPolicy == nil || migPolicy == nil:
×
NEW
496
                        return fmt.Errorf("expected both policies to be "+
×
NEW
497
                                "non-nil. Got expPolicy: %v, "+
×
NEW
498
                                "migPolicy: %v", expPolicy, migPolicy)
×
499

500
                // Both policies are non-nil, we can compare them.
NEW
501
                default:
×
502
                }
503

NEW
504
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
NEW
505
                        expPolicy.ExtraOpaqueData = nil
×
NEW
506
                }
×
NEW
507
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
NEW
508
                        migPolicy.ExtraOpaqueData = nil
×
NEW
509
                }
×
510

NEW
511
                return compare(*expPolicy, *migPolicy, "channel policy")
×
512
        }
513

NEW
514
        err = checkPolicy(policy1, migPol1)
×
NEW
515
        if err != nil {
×
NEW
516
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
NEW
517
                        err)
×
NEW
518
        }
×
519

NEW
520
        err = checkPolicy(policy2, migPol2)
×
NEW
521
        if err != nil {
×
NEW
522
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
NEW
523
                        err)
×
NEW
524
        }
×
525

NEW
526
        return nil
×
527
}
528

529
// migratePruneLog migrates the prune log from the KV backend to the SQL
530
// database. It iterates over each prune log entry in the KV store, inserts it
531
// into the SQL database, and then verifies that the entry was inserted
532
// correctly by fetching it back from the SQL database and comparing it to the
533
// original entry.
534
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
NEW
535
        sqlDB SQLQueries) error {
×
NEW
536

×
NEW
537
        var (
×
NEW
538
                count          uint64
×
NEW
539
                pruneTipHeight uint32
×
NEW
540
                pruneTipHash   chainhash.Hash
×
NEW
541
        )
×
NEW
542

×
NEW
543
        // migrateSinglePruneEntry is a helper function that inserts a single
×
NEW
544
        // prune log entry into the SQL database and verifies that it was
×
NEW
545
        // inserted correctly.
×
NEW
546
        migrateSinglePruneEntry := func(height uint32,
×
NEW
547
                hash *chainhash.Hash) error {
×
NEW
548

×
NEW
549
                count++
×
NEW
550

×
NEW
551
                // Keep track of the prune tip height and hash.
×
NEW
552
                if height > pruneTipHeight {
×
NEW
553
                        pruneTipHeight = height
×
NEW
554
                        pruneTipHash = *hash
×
NEW
555
                }
×
556

NEW
557
                err := sqlDB.UpsertPruneLogEntry(
×
NEW
558
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
NEW
559
                                BlockHeight: int64(height),
×
NEW
560
                                BlockHash:   hash[:],
×
NEW
561
                        },
×
NEW
562
                )
×
NEW
563
                if err != nil {
×
NEW
564
                        return fmt.Errorf("unable to insert prune log "+
×
NEW
565
                                "entry for height %d: %w", height, err)
×
NEW
566
                }
×
567

568
                // Now, check that the entry was inserted correctly.
NEW
569
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
NEW
570
                        ctx, int64(height),
×
NEW
571
                )
×
NEW
572
                if err != nil {
×
NEW
573
                        return fmt.Errorf("could not get prune hash "+
×
NEW
574
                                "for height %d: %w", height, err)
×
NEW
575
                }
×
576

NEW
577
                return compare(hash[:], migratedHash, "prune log entry")
×
578
        }
579

580
        // Iterate over each prune log entry in the KV store and migrate it to
581
        // the SQL database.
NEW
582
        err := forEachPruneLogEntry(kvBackend,
×
NEW
583
                func(height uint32, hash *chainhash.Hash) error {
×
NEW
584
                        err := migrateSinglePruneEntry(height, hash)
×
NEW
585
                        if err != nil {
×
NEW
586
                                return fmt.Errorf("could not migrate "+
×
NEW
587
                                        "prune log entry at height %d: %w",
×
NEW
588
                                        height, err)
×
NEW
589
                        }
×
590

NEW
591
                        return nil
×
592
                },
593
        )
NEW
594
        if err != nil {
×
NEW
595
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
596
        }
×
597

598
        // Check that the prune tip is set correctly in the SQL
599
        // database.
NEW
600
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
NEW
601
        if errors.Is(err, sql.ErrNoRows) {
×
NEW
602
                // The ErrGraphNeverPruned error is expected if no prune log
×
NEW
603
                // entries were migrated from the kvdb store. Otherwise, it's
×
NEW
604
                // an unexpected error.
×
NEW
605
                if count == 0 {
×
NEW
606
                        log.Infof("No prune log entries found in KV store " +
×
NEW
607
                                "to migrate")
×
NEW
608
                        return nil
×
NEW
609
                }
×
610
                // Fall-through to the next error check.
611
        }
NEW
612
        if err != nil {
×
NEW
613
                return fmt.Errorf("could not get prune tip: %w", err)
×
NEW
614
        }
×
615

NEW
616
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
NEW
617
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
NEW
618

×
NEW
619
                return fmt.Errorf("prune tip mismatch after migration: "+
×
NEW
620
                        "expected height %d, hash %s; got height %d, "+
×
NEW
621
                        "hash %s", pruneTipHeight, pruneTipHash,
×
NEW
622
                        pruneTip.BlockHeight,
×
NEW
623
                        chainhash.Hash(pruneTip.BlockHash))
×
NEW
624
        }
×
625

NEW
626
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
NEW
627
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
NEW
628
                pruneTipHash)
×
NEW
629

×
NEW
630
        return nil
×
631
}
632

633
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
634
// and policies from the given row returned by the SQL query
635
// GetChannelBySCIDWithPolicies.
636
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
637
        row sqlc.GetChannelBySCIDWithPoliciesRow,
638
        chain chainhash.Hash) (*models.ChannelEdgeInfo,
NEW
639
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
NEW
640

×
NEW
641
        node1, node2, err := buildNodeVertices(
×
NEW
642
                row.Node.PubKey, row.Node_2.PubKey,
×
NEW
643
        )
×
NEW
644
        if err != nil {
×
NEW
645
                return nil, nil, nil, err
×
NEW
646
        }
×
647

NEW
648
        edge, err := getAndBuildEdgeInfo(
×
NEW
649
                ctx, db, chain, row.Channel.ID, row.Channel, node1, node2,
×
NEW
650
        )
×
NEW
651
        if err != nil {
×
NEW
652
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
NEW
653
                        "info: %w", err)
×
NEW
654
        }
×
655

NEW
656
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
NEW
657
        if err != nil {
×
NEW
658
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
NEW
659
                        "policies: %w", err)
×
NEW
660
        }
×
661

NEW
662
        policy1, policy2, err := getAndBuildChanPolicies(
×
NEW
663
                ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
×
NEW
664
        )
×
NEW
665
        if err != nil {
×
NEW
666
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
NEW
667
                        "policies: %w", err)
×
NEW
668
        }
×
669

NEW
670
        return edge, policy1, policy2, nil
×
671
}
672

673
// forEachPruneLogEntry iterates over each prune log entry in the KV
674
// backend and calls the provided callback function for each entry.
675
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
NEW
676
        hash *chainhash.Hash) error) error {
×
NEW
677

×
NEW
678
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
679
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
NEW
680
                if metaBucket == nil {
×
NEW
681
                        return ErrGraphNotFound
×
NEW
682
                }
×
683

NEW
684
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
NEW
685
                if pruneBucket == nil {
×
NEW
686
                        // The graph has never been pruned and so, there are no
×
NEW
687
                        // entries to iterate over.
×
NEW
688
                        return nil
×
NEW
689
                }
×
690

NEW
691
                return pruneBucket.ForEach(func(k, v []byte) error {
×
NEW
692
                        blockHeight := byteOrder.Uint32(k)
×
NEW
693
                        var blockHash chainhash.Hash
×
NEW
694
                        copy(blockHash[:], v)
×
NEW
695

×
NEW
696
                        return cb(blockHeight, &blockHash)
×
NEW
697
                })
×
NEW
698
        }, func() {})
×
699
}
700

701
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
702
// the SQL database. It iterates over each closed SCID in the KV store, inserts
703
// it into the SQL database, and then verifies that the SCID was inserted
704
// correctly by checking if the channel with the given SCID is seen as closed in
705
// the SQL database.
706
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
707
        sqlDB SQLQueries) error {
×
NEW
708

×
NEW
709
        var count uint64
×
NEW
710
        err := forEachClosedSCID(kvBackend,
×
NEW
711
                func(scid lnwire.ShortChannelID) error {
×
NEW
712
                        count++
×
NEW
713

×
NEW
714
                        chanIDB := channelIDToBytes(scid.ToUint64())
×
NEW
715
                        err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
NEW
716
                        if err != nil {
×
NEW
717
                                return fmt.Errorf("could not insert closed "+
×
NEW
718
                                        "channel with SCID %s: %w", scid, err)
×
NEW
719
                        }
×
720

721
                        // Now, verify that the channel with the given SCID is
722
                        // seen as closed.
NEW
723
                        isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
724
                        if err != nil {
×
NEW
725
                                return fmt.Errorf("could not check if "+
×
NEW
726
                                        "channel %s is closed: %w", scid, err)
×
NEW
727
                        }
×
728

NEW
729
                        if !isClosed {
×
NEW
730
                                return fmt.Errorf("channel %s should be "+
×
NEW
731
                                        "closed, but is not", scid)
×
NEW
732
                        }
×
733

NEW
734
                        return nil
×
735
                },
736
        )
NEW
737
        if err != nil {
×
NEW
738
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
739
                        err)
×
NEW
740
        }
×
741

NEW
742
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
NEW
743

×
NEW
744
        return nil
×
745
}
746

747
// migrateZombieIndex migrates the zombie index from the KV backend to
748
// the SQL database. It iterates over each zombie channel in the KV store,
749
// inserts it into the SQL database, and then verifies that the channel is
750
// indeed marked as a zombie channel in the SQL database.
751
//
752
// NOTE: before inserting an entry into the zombie index, the function checks
753
// if the channel is already marked as closed in the SQL store. If it is,
754
// the entry is skipped. This means that the resulting zombie index count in
755
// the SQL store may well be less than the count of zombie channels in the KV
756
// store.
757
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
758
        sqlDB SQLQueries) error {
×
NEW
759

×
NEW
760
        var count uint64
×
NEW
761
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
NEW
762
                pubKey2 [33]byte) error {
×
NEW
763

×
NEW
764
                chanIDB := channelIDToBytes(chanID)
×
NEW
765

×
NEW
766
                // If it is in the closed SCID index, we don't need to
×
NEW
767
                // add it to the zombie index.
×
NEW
768
                //
×
NEW
769
                // NOTE: this means that the resulting zombie index count in
×
NEW
770
                // the SQL store may well be less than the count of zombie
×
NEW
771
                // channels in the KV store.
×
NEW
772
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
773
                if err != nil {
×
NEW
774
                        return fmt.Errorf("could not check closed "+
×
NEW
775
                                "channel: %w", err)
×
NEW
776
                }
×
NEW
777
                if isClosed {
×
NEW
778
                        return nil
×
NEW
779
                }
×
780

NEW
781
                count++
×
NEW
782

×
NEW
783
                err = sqlDB.UpsertZombieChannel(
×
NEW
784
                        ctx, sqlc.UpsertZombieChannelParams{
×
NEW
785
                                Version:  int16(ProtocolV1),
×
NEW
786
                                Scid:     chanIDB,
×
NEW
787
                                NodeKey1: pubKey1[:],
×
NEW
788
                                NodeKey2: pubKey2[:],
×
NEW
789
                        },
×
NEW
790
                )
×
NEW
791
                if err != nil {
×
NEW
792
                        return fmt.Errorf("could not upsert zombie "+
×
NEW
793
                                "channel %d: %w", chanID, err)
×
NEW
794
                }
×
795

796
                // Finally, verify that the channel is indeed marked as a
797
                // zombie channel.
NEW
798
                isZombie, err := sqlDB.IsZombieChannel(
×
NEW
799
                        ctx, sqlc.IsZombieChannelParams{
×
NEW
800
                                Version: int16(ProtocolV1),
×
NEW
801
                                Scid:    chanIDB,
×
NEW
802
                        },
×
NEW
803
                )
×
NEW
804
                if err != nil {
×
NEW
805
                        return fmt.Errorf("could not check if "+
×
NEW
806
                                "channel %d is zombie: %w", chanID, err)
×
NEW
807
                }
×
808

NEW
809
                if !isZombie {
×
NEW
810
                        return fmt.Errorf("channel %d should be "+
×
NEW
811
                                "a zombie, but is not", chanID)
×
NEW
812
                }
×
813

NEW
814
                return nil
×
815
        })
NEW
816
        if err != nil {
×
NEW
817
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
818
        }
×
819

NEW
820
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
NEW
821

×
NEW
822
        return nil
×
823
}
824

825
// forEachZombieEntry iterates over each zombie channel entry in the
826
// KV backend and calls the provided callback function for each entry.
827
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
NEW
828
        pubKey2 [33]byte) error) error {
×
NEW
829

×
NEW
830
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
831
                edges := tx.ReadBucket(edgeBucket)
×
NEW
832
                if edges == nil {
×
NEW
833
                        return ErrGraphNoEdgesFound
×
NEW
834
                }
×
NEW
835
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
NEW
836
                if zombieIndex == nil {
×
NEW
837
                        return nil
×
NEW
838
                }
×
839

NEW
840
                return zombieIndex.ForEach(func(k, v []byte) error {
×
NEW
841
                        var pubKey1, pubKey2 [33]byte
×
NEW
842
                        copy(pubKey1[:], v[:33])
×
NEW
843
                        copy(pubKey2[:], v[33:])
×
NEW
844

×
NEW
845
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
NEW
846
                })
×
NEW
847
        }, func() {})
×
848
}
849

850
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
851
// the provided callback function for each SCID.
852
func forEachClosedSCID(db kvdb.Backend,
NEW
853
        cb func(lnwire.ShortChannelID) error) error {
×
NEW
854

×
NEW
855
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
856
                closedScids := tx.ReadBucket(closedScidBucket)
×
NEW
857
                if closedScids == nil {
×
NEW
858
                        return nil
×
NEW
859
                }
×
860

NEW
861
                return closedScids.ForEach(func(k, _ []byte) error {
×
NEW
862
                        return cb(lnwire.NewShortChanIDFromInt(
×
NEW
863
                                byteOrder.Uint64(k),
×
NEW
864
                        ))
×
NEW
865
                })
×
NEW
866
        }, func() {})
×
867
}
868

869
// compare checks if the original and migrated objects are equal. If they
870
// are not, it returns an error with a unified diff of the two objects.
NEW
871
func compare(original, migrated any, identifier string) error {
×
NEW
872
        if reflect.DeepEqual(original, migrated) {
×
NEW
873
                return nil
×
NEW
874
        }
×
875

NEW
876
        diff := difflib.UnifiedDiff{
×
NEW
877
                A:        difflib.SplitLines(spew.Sdump(original)),
×
NEW
878
                B:        difflib.SplitLines(spew.Sdump(migrated)),
×
NEW
879
                FromFile: "Expected",
×
NEW
880
                FromDate: "",
×
NEW
881
                ToFile:   "Actual",
×
NEW
882
                ToDate:   "",
×
NEW
883
                Context:  3,
×
NEW
884
        }
×
NEW
885
        diffText, _ := difflib.GetUnifiedDiffString(diff)
×
NEW
886

×
NEW
887
        return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier,
×
NEW
888
                diffText)
×
889
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc