• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16072717768

04 Jul 2025 11:27AM UTC coverage: 67.353% (+9.5%) from 57.822%
16072717768

Pull #10025

github

web-flow
Merge 9f3eac7c6 into b3eb9a3cb
Pull Request #10025: [draft] graph/db: kvdb -> SQL migration

10 of 664 new or added lines in 5 files covered. (1.51%)

89 existing lines in 8 files now uncovered.

135148 of 200655 relevant lines covered (67.35%)

21869.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "database/sql"
7
        "errors"
8
        "fmt"
9
        "reflect"
10
        "sort"
11
        "time"
12

13
        "github.com/btcsuite/btcd/chaincfg/chainhash"
14
        "github.com/davecgh/go-spew/spew"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
19
        "github.com/pmezard/go-difflib/difflib"
20
)
21

22
// ErrMigrationMismatch is returned when a migrated graph record does not match
23
// the original record.
24
var ErrMigrationMismatch = fmt.Errorf("migrated graph record does not match " +
25
        "original record")
26

27
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
28
// backend.
29
//
30
// NOTE: this is currently not called from any code path. It is called via tests
31
// only for now and will be called from the main lnd binary once the
32
// migration is fully implemented and tested.
33
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
NEW
34
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
NEW
35

×
NEW
36
        log.Infof("Starting migration of the graph store from KV to SQL")
×
NEW
37
        t0 := time.Now()
×
NEW
38

×
NEW
39
        // Check if there is a graph to migrate.
×
NEW
40
        graphExists, err := checkGraphExists(kvBackend)
×
NEW
41
        if err != nil {
×
NEW
42
                return fmt.Errorf("failed to check graph existence: %w", err)
×
NEW
43
        }
×
NEW
44
        if !graphExists {
×
NEW
45
                log.Infof("No graph found in KV store, skipping the migration")
×
NEW
46
                return nil
×
NEW
47
        }
×
48

49
        // 1) Migrate all the nodes.
NEW
50
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
NEW
51
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
52
        }
×
53

54
        // 2) Migrate the source node.
NEW
55
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
NEW
56
                return fmt.Errorf("could not migrate source node: %w", err)
×
NEW
57
        }
×
58

59
        // 3) Migrate all the channels and channel policies.
NEW
60
        err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
×
NEW
61
        if err != nil {
×
NEW
62
                return fmt.Errorf("could not migrate channels and policies: %w",
×
NEW
63
                        err)
×
NEW
64
        }
×
65

66
        // 4) Migrate the Prune log.
NEW
67
        if err := migratePruneLog(ctx, kvBackend, sqlDB); err != nil {
×
NEW
68
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
69
        }
×
70

71
        // 5) Migrate the closed SCID index.
NEW
72
        err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
×
NEW
73
        if err != nil {
×
NEW
74
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
75
                        err)
×
NEW
76
        }
×
77

78
        // 6) Migrate the zombie index.
NEW
79
        if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
×
NEW
80
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
81
        }
×
82

NEW
83
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
NEW
84
                time.Since(t0))
×
NEW
85

×
NEW
86
        return nil
×
87
}
88

89
// checkGraphExists checks if the graph exists in the KV backend.
NEW
90
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
NEW
91
        // Check if there is even a graph to migrate.
×
NEW
92
        err := db.View(func(tx kvdb.RTx) error {
×
NEW
93
                // Check for the existence of the node bucket which is a top
×
NEW
94
                // level bucket that would have been created on the initial
×
NEW
95
                // creation of the graph store.
×
NEW
96
                nodes := tx.ReadBucket(nodeBucket)
×
NEW
97
                if nodes == nil {
×
NEW
98
                        return ErrGraphNotFound
×
NEW
99
                }
×
100

NEW
101
                return nil
×
NEW
102
        }, func() {})
×
NEW
103
        if errors.Is(err, ErrGraphNotFound) {
×
NEW
104
                return false, nil
×
NEW
105
        } else if err != nil {
×
NEW
106
                return false, fmt.Errorf("failed to check graph existence: %w",
×
NEW
107
                        err)
×
NEW
108
        }
×
109

NEW
110
        return true, nil
×
111
}
112

113
// migrateNodes migrates all nodes from the KV backend to the SQL database.
114
// This includes doing a sanity check after each migration to ensure that the
115
// migrated node matches the original node.
116
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
NEW
117
        sqlDB SQLQueries) error {
×
NEW
118

×
NEW
119
        // Keep track of the number of nodes migrated and the number of
×
NEW
120
        // nodes skipped due to errors.
×
NEW
121
        var (
×
NEW
122
                count   uint64
×
NEW
123
                skipped uint64
×
NEW
124
        )
×
NEW
125

×
NEW
126
        // Loop through each node in the KV store and insert it into the SQL
×
NEW
127
        // database.
×
NEW
128
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
NEW
129
                node *models.LightningNode) error {
×
NEW
130

×
NEW
131
                pub := node.PubKeyBytes
×
NEW
132

×
NEW
133
                // Sanity check to ensure that the node has valid extra opaque
×
NEW
134
                // data. If it does not, we'll skip it. We need to do this
×
NEW
135
                // because previously we would just persist any TLV bytes that
×
NEW
136
                // we received without validating them. Now, however, we
×
NEW
137
                // normalise the storage of extra opaque data, so we need to
×
NEW
138
                // ensure that the data is valid. We don't want to abort the
×
NEW
139
                // migration if we encounter a node with invalid extra opaque
×
NEW
140
                // data, so we'll just skip it and log a warning.
×
NEW
141
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
NEW
142
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
NEW
143
                        skipped++
×
NEW
144
                        log.Warnf("Skipping migration of node %x with invalid "+
×
NEW
145
                                "extra opaque data: %v", pub,
×
NEW
146
                                node.ExtraOpaqueData)
×
NEW
147

×
NEW
148
                        return nil
×
NEW
149
                } else if err != nil {
×
NEW
150
                        return fmt.Errorf("unable to marshal extra "+
×
NEW
151
                                "opaque data for node %x: %w", pub, err)
×
NEW
152
                }
×
153

NEW
154
                count++
×
NEW
155

×
NEW
156
                // Write the node to the SQL database.
×
NEW
157
                id, err := upsertNode(ctx, sqlDB, node)
×
NEW
158
                if err != nil {
×
NEW
159
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
NEW
160
                                err)
×
NEW
161
                }
×
162

163
                // Fetch it from the SQL store and compare it against the
164
                // original node object to ensure the migration was successful.
NEW
165
                dbNode, err := sqlDB.GetNodeByPubKey(
×
NEW
166
                        ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
167
                                PubKey:  node.PubKeyBytes[:],
×
NEW
168
                                Version: int16(ProtocolV1),
×
NEW
169
                        },
×
NEW
170
                )
×
NEW
171
                if err != nil {
×
NEW
172
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
NEW
173
                                "after migration: %w", pub, err)
×
NEW
174
                }
×
175

176
                // Sanity check: ensure the migrated node ID matches the one we
177
                // just inserted.
NEW
178
                if dbNode.ID != id {
×
NEW
179
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
NEW
180
                                "after migration: expected %d, got %d",
×
NEW
181
                                pub, id, dbNode.ID)
×
NEW
182
                }
×
183

NEW
184
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
NEW
185
                if err != nil {
×
NEW
186
                        return fmt.Errorf("could not build migrated node "+
×
NEW
187
                                "from dbNode(db id: %d, node pub: %x): %w",
×
NEW
188
                                dbNode.ID, pub, err)
×
NEW
189
                }
×
190

191
                // Make sure that the node addresses are sorted before
192
                // comparing them to ensure that the order of addresses does
193
                // not affect the comparison.
NEW
194
                sort.Slice(node.Addresses, func(i, j int) bool {
×
NEW
195
                        return node.Addresses[i].String() <
×
NEW
196
                                node.Addresses[j].String()
×
NEW
197
                })
×
NEW
198
                sort.Slice(migratedNode.Addresses, func(i, j int) bool {
×
NEW
199
                        return migratedNode.Addresses[i].String() <
×
NEW
200
                                migratedNode.Addresses[j].String()
×
NEW
201
                })
×
202

NEW
203
                return compare(node, migratedNode, fmt.Sprintf("node %x", pub))
×
204
        })
NEW
205
        if err != nil {
×
NEW
206
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
207
        }
×
208

NEW
209
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
NEW
210
                "invalid TLV streams)", count, skipped)
×
NEW
211

×
NEW
212
        return nil
×
213
}
214

215
// migrateSourceNode migrates the source node from the KV backend to the
216
// SQL database.
217
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
NEW
218
        sqlDB SQLQueries) error {
×
NEW
219

×
NEW
220
        sourceNode, err := sourceNode(kvdb)
×
NEW
221
        if errors.Is(err, ErrSourceNodeNotSet) {
×
NEW
222
                // If the source node has not been set yet, we can skip this
×
NEW
223
                // migration step.
×
NEW
224
                return nil
×
NEW
225
        } else if err != nil {
×
NEW
226
                return fmt.Errorf("could not get source node from kv "+
×
NEW
227
                        "store: %w", err)
×
NEW
228
        }
×
229

NEW
230
        pub := sourceNode.PubKeyBytes
×
NEW
231

×
NEW
232
        // Get the DB ID of the source node by its public key. This node must
×
NEW
233
        // already exist in the SQL database, as it should have been migrated
×
NEW
234
        // in the previous node-migration step.
×
NEW
235
        id, err := sqlDB.GetNodeIDByPubKey(
×
NEW
236
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
NEW
237
                        PubKey:  pub[:],
×
NEW
238
                        Version: int16(ProtocolV1),
×
NEW
239
                },
×
NEW
240
        )
×
NEW
241
        if err != nil {
×
NEW
242
                return fmt.Errorf("could not get source node ID: %w", err)
×
NEW
243
        }
×
244

245
        // Now we can add the source node to the SQL database.
NEW
246
        err = sqlDB.AddSourceNode(ctx, id)
×
NEW
247
        if err != nil {
×
NEW
248
                return fmt.Errorf("could not add source node to SQL store: %w",
×
NEW
249
                        err)
×
NEW
250
        }
×
251

252
        // Verify that the source node was added correctly by fetching it back
253
        // from the SQL database and checking that the expected DB ID and
254
        // pub key are returned. We don't need to do a whole node comparison
255
        // here, as this was already done in the previous migration step.
NEW
256
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
NEW
257
        if err != nil {
×
NEW
258
                return fmt.Errorf("could not get source nodes from SQL "+
×
NEW
259
                        "store: %w", err)
×
NEW
260
        }
×
261

262
        // The SQL store has support for multiple source nodes (for future
263
        // protocol versions) but this migration is purely aimed at the V1
264
        // store, and so we expect exactly one source node to be present.
NEW
265
        if len(srcNodes) != 1 {
×
NEW
266
                return fmt.Errorf("expected exactly one source node, "+
×
NEW
267
                        "got %d", len(srcNodes))
×
NEW
268
        }
×
269

270
        // Check that the source node ID and pub key match the original
271
        // source node.
NEW
272
        if srcNodes[0].NodeID != id {
×
NEW
273
                return fmt.Errorf("source node ID mismatch after migration: "+
×
NEW
274
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
NEW
275
        }
×
NEW
276
        err = compare(pub[:], srcNodes[0].PubKey, "source node")
×
NEW
277
        if err != nil {
×
NEW
278
                return fmt.Errorf("source node pubkey mismatch after "+
×
NEW
279
                        "migration: %w", err)
×
NEW
280
        }
×
281

NEW
282
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
NEW
283

×
NEW
284
        return nil
×
285
}
286

287
// migrateChannelsAndPolicies migrates all channels and their policies
288
// from the KV backend to the SQL database.
289
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
NEW
290
        sqlDB SQLQueries, chain chainhash.Hash) error {
×
NEW
291

×
NEW
292
        var (
×
NEW
293
                channelCount       uint64
×
NEW
294
                skippedChanCount   uint64
×
NEW
295
                policyCount        uint64
×
NEW
296
                skippedPolicyCount uint64
×
NEW
297
        )
×
NEW
298
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
NEW
299
                // If the policy is nil, we can skip it.
×
NEW
300
                if policy == nil {
×
NEW
301
                        return nil
×
NEW
302
                }
×
303

304
                // Sanity check to ensure that the policy has valid extra opaque
305
                // data. If it does not, we'll skip it. We need to do this
306
                // because previously we would just persist any TLV bytes that
307
                // we received without validating them. Now, however, we
308
                // normalise the storage of extra opaque data, so we need to
309
                // ensure that the data is valid. We don't want to abort the
310
                // migration if we encounter a policy with invalid extra opaque
311
                // data, so we'll just skip it and log a warning.
NEW
312
                _, err := marshalExtraOpaqueData(policy.ExtraOpaqueData)
×
NEW
313
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
NEW
314
                        skippedPolicyCount++
×
NEW
315
                        log.Warnf("Skipping policy for channel %d with "+
×
NEW
316
                                "invalid extra opaque data: %v",
×
NEW
317
                                policy.ChannelID, policy.ExtraOpaqueData)
×
NEW
318

×
NEW
319
                        return nil
×
NEW
320
                } else if err != nil {
×
NEW
321
                        return fmt.Errorf("unable to marshal extra opaque "+
×
NEW
322
                                "data: %w. %+v", err, policy.ExtraOpaqueData)
×
NEW
323
                }
×
324

NEW
325
                policyCount++
×
NEW
326

×
NEW
327
                _, _, _, err = updateChanEdgePolicy(ctx, sqlDB, policy)
×
NEW
328
                if err != nil {
×
NEW
329
                        return fmt.Errorf("could not migrate channel "+
×
NEW
330
                                "policy %d: %w", policy.ChannelID, err)
×
NEW
331
                }
×
332

NEW
333
                return nil
×
334
        }
335

336
        // Iterate over each channel in the KV store and migrate it and its
337
        // policies to the SQL database.
NEW
338
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
NEW
339
                policy1 *models.ChannelEdgePolicy,
×
NEW
340
                policy2 *models.ChannelEdgePolicy) error {
×
NEW
341

×
NEW
342
                scid := channel.ChannelID
×
NEW
343

×
NEW
344
                // Here, we do a sanity check to ensure that the chain hash of
×
NEW
345
                // the channel returned by the KV store matches the expected
×
NEW
346
                // chain hash. This is important since in the SQL store, we will
×
NEW
347
                // no longer explicitly store the chain hash in the channel
×
NEW
348
                // info, but rather rely on the chain hash LND is running with.
×
NEW
349
                // So this is our way of ensuring that LND is running on the
×
NEW
350
                // correct network at migration time.
×
NEW
351
                if channel.ChainHash != chain {
×
NEW
352
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
NEW
353
                                "expected %s", scid, channel.ChainHash, chain)
×
NEW
354
                }
×
355

356
                // Sanity check to ensure that the channel has valid extra
357
                // opaque data. If it does not, we'll skip it. We need to do
358
                // this because previously we would just persist any TLV bytes
359
                // that we received without validating them. Now, however, we
360
                // normalise the storage of extra opaque data, so we need to
361
                // ensure that the data is valid. We don't want to abort the
362
                // migration if we encounter a channel with invalid extra opaque
363
                // data, so we'll just skip it and log a warning.
NEW
364
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
NEW
365
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
NEW
366
                        log.Warnf("Skipping channel %d with invalid "+
×
NEW
367
                                "extra opaque data: %v", scid,
×
NEW
368
                                channel.ExtraOpaqueData)
×
NEW
369

×
NEW
370
                        skippedChanCount++
×
NEW
371

×
NEW
372
                        // If we skip a channel, we also skip its policies.
×
NEW
373
                        if policy1 != nil {
×
NEW
374
                                skippedPolicyCount++
×
NEW
375
                        }
×
NEW
376
                        if policy2 != nil {
×
NEW
377
                                skippedPolicyCount++
×
NEW
378
                        }
×
379

NEW
380
                        return nil
×
NEW
381
                } else if err != nil {
×
NEW
382
                        return fmt.Errorf("unable to marshal extra opaque "+
×
NEW
383
                                "data for channel %d: %w %v", scid, err,
×
NEW
384
                                channel.ExtraOpaqueData)
×
NEW
385
                }
×
386

NEW
387
                channelCount++
×
NEW
388
                err = migrateSingleChannel(
×
NEW
389
                        ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
×
NEW
390
                )
×
NEW
391
                if err != nil {
×
NEW
392
                        return fmt.Errorf("could not migrate channel %d: %w",
×
NEW
393
                                scid, err)
×
NEW
394
                }
×
395

NEW
396
                return nil
×
397
        })
NEW
398
        if err != nil {
×
NEW
399
                return fmt.Errorf("could not migrate channels and policies: %w",
×
NEW
400
                        err)
×
NEW
401
        }
×
402

NEW
403
        log.Infof("Migrated %d channels and %d policies from KV to SQL "+
×
NEW
404
                "(skipped %d channels and %d policies due to invalid TLV "+
×
NEW
405
                "streams)", channelCount, policyCount, skippedChanCount,
×
NEW
406
                skippedPolicyCount)
×
NEW
407

×
NEW
408
        return nil
×
409
}
410

411
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
412
        channel *models.ChannelEdgeInfo,
413
        policy1, policy2 *models.ChannelEdgePolicy,
NEW
414
        migChanPolicy func(*models.ChannelEdgePolicy) error) error {
×
NEW
415

×
NEW
416
        scid := channel.ChannelID
×
NEW
417

×
NEW
418
        // First, migrate the channel info along with its policies.
×
NEW
419
        dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
NEW
420
        if err != nil {
×
NEW
421
                return fmt.Errorf("could not insert record for channel %d "+
×
NEW
422
                        "in SQL store: %w", scid, err)
×
NEW
423
        }
×
424

425
        // Now, migrate the two channel policies.
NEW
426
        err = migChanPolicy(policy1)
×
NEW
427
        if err != nil {
×
NEW
428
                return fmt.Errorf("could not migrate policy1(%d): %w", scid,
×
NEW
429
                        err)
×
NEW
430
        }
×
NEW
431
        err = migChanPolicy(policy2)
×
NEW
432
        if err != nil {
×
NEW
433
                return fmt.Errorf("could not migrate policy2(%d): %w", scid,
×
NEW
434
                        err)
×
NEW
435
        }
×
436

437
        // Now, fetch the channel and its policies from the SQL DB.
NEW
438
        row, err := sqlDB.GetChannelBySCIDWithPolicies(
×
NEW
439
                ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
×
NEW
440
                        Scid:    channelIDToBytes(scid),
×
NEW
441
                        Version: int16(ProtocolV1),
×
NEW
442
                },
×
NEW
443
        )
×
NEW
444
        if err != nil {
×
NEW
445
                return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
×
NEW
446
                        err)
×
NEW
447
        }
×
448

449
        // Assert that the DB IDs for the channel and nodes are as expected
450
        // given the inserted channel info.
NEW
451
        err = compare(dbChanInfo.channelID, row.Channel.ID, "channel DB ID")
×
NEW
452
        if err != nil {
×
NEW
453
                return err
×
NEW
454
        }
×
NEW
455
        err = compare(dbChanInfo.node1ID, row.Node.ID, "node1 DB ID")
×
NEW
456
        if err != nil {
×
NEW
457
                return err
×
NEW
458
        }
×
NEW
459
        err = compare(dbChanInfo.node2ID, row.Node_2.ID, "node2 DB ID")
×
NEW
460
        if err != nil {
×
NEW
461
                return err
×
NEW
462
        }
×
463

NEW
464
        migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
×
NEW
465
                ctx, sqlDB, row, channel.ChainHash,
×
NEW
466
        )
×
NEW
467
        if err != nil {
×
NEW
468
                return fmt.Errorf("could not build migrated channel and "+
×
NEW
469
                        "policies: %w", err)
×
NEW
470
        }
×
471

472
        // Finally, compare the original channel info and
473
        // policies with the migrated ones to ensure they match.
NEW
474
        if len(channel.ExtraOpaqueData) == 0 {
×
NEW
475
                channel.ExtraOpaqueData = nil
×
NEW
476
        }
×
NEW
477
        if len(migChan.ExtraOpaqueData) == 0 {
×
NEW
478
                migChan.ExtraOpaqueData = nil
×
NEW
479
        }
×
480

NEW
481
        err = compare(channel, migChan, fmt.Sprintf("channel %d", scid))
×
NEW
482
        if err != nil {
×
NEW
483
                return err
×
NEW
484
        }
×
485

NEW
486
        checkPolicy := func(expPolicy,
×
NEW
487
                migPolicy *models.ChannelEdgePolicy) error {
×
NEW
488

×
NEW
489
                switch {
×
490
                // Both policies are nil, nothing to compare.
NEW
491
                case expPolicy == nil && migPolicy == nil:
×
NEW
492
                        return nil
×
493

494
                // One of the policies is nil, but the other is not.
NEW
495
                case expPolicy == nil || migPolicy == nil:
×
NEW
496
                        return fmt.Errorf("expected both policies to be "+
×
NEW
497
                                "non-nil. Got expPolicy: %v, "+
×
NEW
498
                                "migPolicy: %v", expPolicy, migPolicy)
×
499

500
                // Both policies are non-nil, we can compare them.
NEW
501
                default:
×
502
                }
503

NEW
504
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
NEW
505
                        expPolicy.ExtraOpaqueData = nil
×
NEW
506
                }
×
NEW
507
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
NEW
508
                        migPolicy.ExtraOpaqueData = nil
×
NEW
509
                }
×
510

NEW
511
                return compare(
×
NEW
512
                        *expPolicy, *migPolicy, fmt.Sprintf("channel policy"),
×
NEW
513
                )
×
514

515
        }
516

NEW
517
        err = checkPolicy(policy1, migPol1)
×
NEW
518
        if err != nil {
×
NEW
519
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
NEW
520
                        err)
×
NEW
521
        }
×
522

NEW
523
        err = checkPolicy(policy2, migPol2)
×
NEW
524
        if err != nil {
×
NEW
525
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
NEW
526
                        err)
×
NEW
527
        }
×
528

NEW
529
        return nil
×
530
}
531

532
// migratePruneLog migrates the prune log from the KV backend to the SQL
533
// database. It iterates over each prune log entry in the KV store, inserts it
534
// into the SQL database, and then verifies that the entry was inserted
535
// correctly by fetching it back from the SQL database and comparing it to the
536
// original entry.
537
func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
NEW
538
        sqlDB SQLQueries) error {
×
NEW
539

×
NEW
540
        var (
×
NEW
541
                count          uint64
×
NEW
542
                pruneTipHeight uint32
×
NEW
543
                pruneTipHash   chainhash.Hash
×
NEW
544
        )
×
NEW
545

×
NEW
546
        // migrateSinglePruneEntry is a helper function that inserts a single
×
NEW
547
        // prune log entry into the SQL database and verifies that it was
×
NEW
548
        // inserted correctly.
×
NEW
549
        migrateSinglePruneEntry := func(height uint32,
×
NEW
550
                hash *chainhash.Hash) error {
×
NEW
551

×
NEW
552
                count++
×
NEW
553

×
NEW
554
                // Keep track of the prune tip height and hash.
×
NEW
555
                if height > pruneTipHeight {
×
NEW
556
                        pruneTipHeight = height
×
NEW
557
                        pruneTipHash = *hash
×
NEW
558
                }
×
559

NEW
560
                err := sqlDB.UpsertPruneLogEntry(
×
NEW
561
                        ctx, sqlc.UpsertPruneLogEntryParams{
×
NEW
562
                                BlockHeight: int64(height),
×
NEW
563
                                BlockHash:   hash[:],
×
NEW
564
                        },
×
NEW
565
                )
×
NEW
566
                if err != nil {
×
NEW
567
                        return fmt.Errorf("unable to insert prune log "+
×
NEW
568
                                "entry for height %d: %w", height, err)
×
NEW
569
                }
×
570

571
                // Now, check that the entry was inserted correctly.
NEW
572
                migratedHash, err := sqlDB.GetPruneHashByHeight(
×
NEW
573
                        ctx, int64(height),
×
NEW
574
                )
×
NEW
575
                if err != nil {
×
NEW
576
                        return fmt.Errorf("could not get prune hash "+
×
NEW
577
                                "for height %d: %w", height, err)
×
NEW
578
                }
×
579

NEW
580
                return compare(
×
NEW
581
                        hash[:], migratedHash, fmt.Sprintf("prune log entry"),
×
NEW
582
                )
×
583
        }
584

585
        // Iterate over each prune log entry in the KV store and migrate it to
586
        // the SQL database.
NEW
587
        err := forEachPruneLogEntry(kvBackend,
×
NEW
588
                func(height uint32, hash *chainhash.Hash) error {
×
NEW
589
                        err := migrateSinglePruneEntry(height, hash)
×
NEW
590
                        if err != nil {
×
NEW
591
                                return fmt.Errorf("could not migrate "+
×
NEW
592
                                        "prune log entry at height %d: %w",
×
NEW
593
                                        height, err)
×
NEW
594
                        }
×
595

NEW
596
                        return nil
×
597
                },
598
        )
NEW
599
        if err != nil {
×
NEW
600
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
601
        }
×
602

603
        // Check that the prune tip is set correctly in the SQL
604
        // database.
NEW
605
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
NEW
606
        if errors.Is(err, sql.ErrNoRows) {
×
NEW
607
                // The ErrGraphNeverPruned error is expected if no prune log
×
NEW
608
                // entries were migrated from the kvdb store. Otherwise, it's
×
NEW
609
                // an unexpected error.
×
NEW
610
                if count == 0 {
×
NEW
611
                        log.Infof("No prune log entries found in KV store " +
×
NEW
612
                                "to migrate")
×
NEW
613
                        return nil
×
NEW
614
                }
×
615
                // Fall-through to the next error check.
616
        }
NEW
617
        if err != nil {
×
NEW
618
                return fmt.Errorf("could not get prune tip: %w", err)
×
NEW
619
        }
×
620

NEW
621
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
NEW
622
                !bytes.Equal(pruneTip.BlockHash[:], pruneTipHash[:]) {
×
NEW
623

×
NEW
624
                return fmt.Errorf("prune tip mismatch after migration: "+
×
NEW
625
                        "expected height %d, hash %s; got height %d, "+
×
NEW
626
                        "hash %s", pruneTipHeight, pruneTipHash,
×
NEW
627
                        pruneTip.BlockHeight,
×
NEW
628
                        chainhash.Hash(pruneTip.BlockHash))
×
NEW
629
        }
×
630

NEW
631
        log.Infof("Migrated %d prune log entries from KV to SQL. The prune "+
×
NEW
632
                "tip is: height %d, hash: %s", count, pruneTipHeight,
×
NEW
633
                pruneTipHash)
×
NEW
634

×
NEW
635
        return nil
×
636
}
637

638
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
639
// and policies from the given row returned by the SQL query
640
// GetChannelBySCIDWithPolicies.
641
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
642
        row sqlc.GetChannelBySCIDWithPoliciesRow,
643
        chain chainhash.Hash) (*models.ChannelEdgeInfo,
NEW
644
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
×
NEW
645

×
NEW
646
        node1, node2, err := buildNodeVertices(
×
NEW
647
                row.Node.PubKey, row.Node_2.PubKey,
×
NEW
648
        )
×
NEW
649
        if err != nil {
×
NEW
650
                return nil, nil, nil, err
×
NEW
651
        }
×
652

NEW
653
        edge, err := getAndBuildEdgeInfo(
×
NEW
654
                ctx, db, chain, row.Channel.ID, row.Channel, node1, node2,
×
NEW
655
        )
×
NEW
656
        if err != nil {
×
NEW
657
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
NEW
658
                        "info: %w", err)
×
NEW
659
        }
×
660

NEW
661
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
NEW
662
        if err != nil {
×
NEW
663
                return nil, nil, nil, fmt.Errorf("unable to extract channel "+
×
NEW
664
                        "policies: %w", err)
×
NEW
665
        }
×
666

NEW
667
        policy1, policy2, err := getAndBuildChanPolicies(
×
NEW
668
                ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
×
NEW
669
        )
×
NEW
670
        if err != nil {
×
NEW
671
                return nil, nil, nil, fmt.Errorf("unable to build channel "+
×
NEW
672
                        "policies: %w", err)
×
NEW
673
        }
×
674

NEW
675
        return edge, policy1, policy2, nil
×
676
}
677

678
// forEachPruneLogEntry iterates over each prune log entry in the KV
679
// backend and calls the provided callback function for each entry.
680
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
NEW
681
        hash *chainhash.Hash) error) error {
×
NEW
682

×
NEW
683
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
684
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
NEW
685
                if metaBucket == nil {
×
NEW
686
                        return ErrGraphNotFound
×
NEW
687
                }
×
688

NEW
689
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
NEW
690
                if pruneBucket == nil {
×
NEW
691
                        // The graph has never been pruned and so, there are no
×
NEW
692
                        // entries to iterate over.
×
NEW
693
                        return nil
×
NEW
694
                }
×
695

NEW
696
                return pruneBucket.ForEach(func(k, v []byte) error {
×
NEW
697
                        blockHeight := byteOrder.Uint32(k)
×
NEW
698
                        var blockHash chainhash.Hash
×
NEW
699
                        copy(blockHash[:], v)
×
NEW
700

×
NEW
701
                        return cb(blockHeight, &blockHash)
×
NEW
702
                })
×
NEW
703
        }, func() {})
×
704
}
705

706
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
707
// the SQL database. It iterates over each closed SCID in the KV store, inserts
708
// it into the SQL database, and then verifies that the SCID was inserted
709
// correctly by checking if the channel with the given SCID is seen as closed in
710
// the SQL database.
711
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
712
        sqlDB SQLQueries) error {
×
NEW
713

×
NEW
714
        var count uint64
×
NEW
715
        err := forEachClosedSCID(kvBackend,
×
NEW
716
                func(scid lnwire.ShortChannelID) error {
×
NEW
717
                        count++
×
NEW
718

×
NEW
719
                        chanIDB := channelIDToBytes(scid.ToUint64())
×
NEW
720
                        err := sqlDB.InsertClosedChannel(ctx, chanIDB[:])
×
NEW
721
                        if err != nil {
×
NEW
722
                                return fmt.Errorf("could not insert closed "+
×
NEW
723
                                        "channel with SCID %s: %w", scid, err)
×
NEW
724
                        }
×
725

726
                        // Now, verify that the channel with the given SCID is
727
                        // seen as closed.
NEW
728
                        isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
729
                        if err != nil {
×
NEW
730
                                return fmt.Errorf("could not check if "+
×
NEW
731
                                        "channel %s is closed: %w", scid, err)
×
NEW
732
                        }
×
733

NEW
734
                        if !isClosed {
×
NEW
735
                                return fmt.Errorf("channel %s should be "+
×
NEW
736
                                        "closed, but is not", scid)
×
NEW
737
                        }
×
738

NEW
739
                        return nil
×
740
                },
741
        )
NEW
742
        if err != nil {
×
NEW
743
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
744
                        err)
×
NEW
745
        }
×
746

NEW
747
        log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
×
NEW
748

×
NEW
749
        return nil
×
750
}
751

752
// migrateZombieIndex migrates the zombie index from the KV backend to
753
// the SQL database. It iterates over each zombie channel in the KV store,
754
// inserts it into the SQL database, and then verifies that the channel is
755
// indeed marked as a zombie channel in the SQL database.
756
//
757
// NOTE: before inserting an entry into the zombie index, the function checks
758
// if the channel is already marked as closed in the SQL store. If it is,
759
// the entry is skipped. This means that the resulting zombie index count in
760
// the SQL store may well be less than the count of zombie channels in the KV
761
// store.
762
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
NEW
763
        sqlDB SQLQueries) error {
×
NEW
764

×
NEW
765
        var count uint64
×
NEW
766
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
NEW
767
                pubKey2 [33]byte) error {
×
NEW
768

×
NEW
769
                chanIDB := channelIDToBytes(chanID)
×
NEW
770

×
NEW
771
                // If it is in the closed SCID index, we don't need to
×
NEW
772
                // add it to the zombie index.
×
NEW
773
                //
×
NEW
774
                // NOTE: this means that the resulting zombie index count in
×
NEW
775
                // the SQL store may well be less than the count of zombie
×
NEW
776
                //channels in the KV store.
×
NEW
777
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
NEW
778
                if err != nil {
×
NEW
779
                        return fmt.Errorf("could not check closed "+
×
NEW
780
                                "channel: %w", err)
×
NEW
781
                }
×
NEW
782
                if isClosed {
×
NEW
783
                        return nil
×
NEW
784
                }
×
785

NEW
786
                count++
×
NEW
787

×
NEW
788
                err = sqlDB.UpsertZombieChannel(
×
NEW
789
                        ctx, sqlc.UpsertZombieChannelParams{
×
NEW
790
                                Version:  int16(ProtocolV1),
×
NEW
791
                                Scid:     chanIDB[:],
×
NEW
792
                                NodeKey1: pubKey1[:],
×
NEW
793
                                NodeKey2: pubKey2[:],
×
NEW
794
                        },
×
NEW
795
                )
×
NEW
796
                if err != nil {
×
NEW
797
                        return fmt.Errorf("could not upsert zombie "+
×
NEW
798
                                "channel %d: %w", chanID, err)
×
NEW
799
                }
×
800

801
                // Finally, verify that the channel is indeed marked as a
802
                // zombie channel.
NEW
803
                isZombie, err := sqlDB.IsZombieChannel(
×
NEW
804
                        ctx, sqlc.IsZombieChannelParams{
×
NEW
805
                                Version: int16(ProtocolV1),
×
NEW
806
                                Scid:    chanIDB,
×
NEW
807
                        },
×
NEW
808
                )
×
NEW
809
                if err != nil {
×
NEW
810
                        return fmt.Errorf("could not check if "+
×
NEW
811
                                "channel %d is zombie: %w", chanID, err)
×
NEW
812
                }
×
813

NEW
814
                if !isZombie {
×
NEW
815
                        return fmt.Errorf("channel %d should be "+
×
NEW
816
                                "a zombie, but is not", chanID)
×
NEW
817
                }
×
818

NEW
819
                return nil
×
820
        })
NEW
821
        if err != nil {
×
NEW
822
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
823
        }
×
824

NEW
825
        log.Infof("Migrated %d zombie channels from KV to SQL", count)
×
NEW
826

×
NEW
827
        return nil
×
828
}
829

830
// forEachZombieEntry iterates over each zombie channel entry in the
831
// KV backend and calls the provided callback function for each entry
832
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
NEW
833
        pubKey2 [33]byte) error) error {
×
NEW
834

×
NEW
835
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
836
                edges := tx.ReadBucket(edgeBucket)
×
NEW
837
                if edges == nil {
×
NEW
838
                        return ErrGraphNoEdgesFound
×
NEW
839
                }
×
NEW
840
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
NEW
841
                if zombieIndex == nil {
×
NEW
842
                        return nil
×
NEW
843
                }
×
844

NEW
845
                return zombieIndex.ForEach(func(k, v []byte) error {
×
NEW
846
                        var pubKey1, pubKey2 [33]byte
×
NEW
847
                        copy(pubKey1[:], v[:33])
×
NEW
848
                        copy(pubKey2[:], v[33:])
×
NEW
849

×
NEW
850
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
NEW
851
                })
×
NEW
852
        }, func() {})
×
853
}
854

855
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
856
// the provided callback function for each SCID.
857
func forEachClosedSCID(db kvdb.Backend,
NEW
858
        cb func(lnwire.ShortChannelID) error) error {
×
NEW
859

×
NEW
860
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
NEW
861
                closedScids := tx.ReadBucket(closedScidBucket)
×
NEW
862
                if closedScids == nil {
×
NEW
863
                        return nil
×
NEW
864
                }
×
865

NEW
866
                return closedScids.ForEach(func(k, _ []byte) error {
×
NEW
867
                        return cb(lnwire.NewShortChanIDFromInt(
×
NEW
868
                                byteOrder.Uint64(k),
×
NEW
869
                        ))
×
NEW
870
                })
×
NEW
871
        }, func() {})
×
872
}
873

874
// compare checks if the original and migrated objects are equal. If they
875
// are not, it returns an error with a unified diff of the two objects.
NEW
876
func compare(original, migrated any, identifier string) error {
×
NEW
877
        if reflect.DeepEqual(original, migrated) {
×
NEW
878
                return nil
×
NEW
879
        }
×
880

NEW
881
        diff := difflib.UnifiedDiff{
×
NEW
882
                A:        difflib.SplitLines(spew.Sdump(original)),
×
NEW
883
                B:        difflib.SplitLines(spew.Sdump(migrated)),
×
NEW
884
                FromFile: "Expected",
×
NEW
885
                FromDate: "",
×
NEW
886
                ToFile:   "Actual",
×
NEW
887
                ToDate:   "",
×
NEW
888
                Context:  3,
×
NEW
889
        }
×
890
        diffText, _ := difflib.GetUnifiedDiffString(diff)
×
891

×
892
        return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier,
×
893
                diffText)
×
894
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc