• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 18197857992

02 Oct 2025 03:32PM UTC coverage: 66.622% (-0.02%) from 66.646%
18197857992

Pull #10267

github

web-flow
Merge 0d9bfccfe into 1c2ff4a7e
Pull Request #10267: [g175] multi: small G175 preparations

24 of 141 new or added lines in 12 files covered. (17.02%)

64 existing lines in 20 files now uncovered.

137216 of 205963 relevant lines covered (66.62%)

21302.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "cmp"
6
        "context"
7
        "database/sql"
8
        "errors"
9
        "fmt"
10
        "net"
11
        "slices"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/routing/route"
19
        "github.com/lightningnetwork/lnd/sqldb"
20
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
21
        "golang.org/x/time/rate"
22
)
23

24
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
25
// backend.
26
//
27
// NOTE: this is currently not called from any code path. It is called via tests
28
// only for now and will be called from the main lnd binary once the
29
// migration is fully implemented and tested.
30
func MigrateGraphToSQL(ctx context.Context, cfg *SQLStoreConfig,
31
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
32

×
33
        log.Infof("Starting migration of the graph store from KV to SQL")
×
34
        t0 := time.Now()
×
35

×
36
        // Check if there is a graph to migrate.
×
37
        graphExists, err := checkGraphExists(kvBackend)
×
38
        if err != nil {
×
39
                return fmt.Errorf("failed to check graph existence: %w", err)
×
40
        }
×
41
        if !graphExists {
×
42
                log.Infof("No graph found in KV store, skipping the migration")
×
43
                return nil
×
44
        }
×
45

46
        // 1) Migrate all the nodes.
47
        err = migrateNodes(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
48
        if err != nil {
×
49
                return fmt.Errorf("could not migrate nodes: %w", err)
×
50
        }
×
51

52
        // 2) Migrate the source node.
53
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
54
                return fmt.Errorf("could not migrate source node: %w", err)
×
55
        }
×
56

57
        // 3) Migrate all the channels and channel policies.
58
        err = migrateChannelsAndPolicies(ctx, cfg, kvBackend, sqlDB)
×
59
        if err != nil {
×
60
                return fmt.Errorf("could not migrate channels and policies: %w",
×
61
                        err)
×
62
        }
×
63

64
        // 4) Migrate the Prune log.
65
        err = migratePruneLog(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
66
        if err != nil {
×
67
                return fmt.Errorf("could not migrate prune log: %w", err)
×
68
        }
×
69

70
        // 5) Migrate the closed SCID index.
71
        err = migrateClosedSCIDIndex(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
72
        if err != nil {
×
73
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
74
                        err)
×
75
        }
×
76

77
        // 6) Migrate the zombie index.
78
        err = migrateZombieIndex(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
79
        if err != nil {
×
80
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
81
        }
×
82

83
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
84
                time.Since(t0))
×
85

×
86
        return nil
×
87
}
88

89
// checkGraphExists checks if the graph exists in the KV backend.
90
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
91
        // Check if there is even a graph to migrate.
×
92
        err := db.View(func(tx kvdb.RTx) error {
×
93
                // Check for the existence of the node bucket which is a top
×
94
                // level bucket that would have been created on the initial
×
95
                // creation of the graph store.
×
96
                nodes := tx.ReadBucket(nodeBucket)
×
97
                if nodes == nil {
×
98
                        return ErrGraphNotFound
×
99
                }
×
100

101
                return nil
×
102
        }, func() {})
×
103
        if errors.Is(err, ErrGraphNotFound) {
×
104
                return false, nil
×
105
        } else if err != nil {
×
106
                return false, err
×
107
        }
×
108

109
        return true, nil
×
110
}
111

112
// migrateNodes migrates all nodes from the KV backend to the SQL database.
113
// It collects nodes in batches, inserts them individually, and then validates
114
// them in batches.
115
func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig,
116
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
117

×
118
        // Keep track of the number of nodes migrated and the number of
×
119
        // nodes skipped due to errors.
×
120
        var (
×
121
                totalTime = time.Now()
×
122

×
123
                count   uint64
×
124
                skipped uint64
×
125

×
126
                t0    = time.Now()
×
127
                chunk uint64
×
128
                s     = rate.Sometimes{
×
129
                        Interval: 10 * time.Second,
×
130
                }
×
131
        )
×
132

×
133
        // batch is a map that holds node objects that have been migrated to
×
134
        // the native SQL store that have yet to be validated. The object's held
×
135
        // by this map were derived from the KVDB store and so when they are
×
136
        // validated, the map index (the SQL store node ID) will be used to
×
137
        // fetch the corresponding node object in the SQL store, and it will
×
138
        // then be compared against the original KVDB node object.
×
139
        batch := make(
×
140
                map[int64]*models.Node, cfg.MaxBatchSize,
×
141
        )
×
142

×
143
        // validateBatch validates that the batch of nodes in the 'batch' map
×
144
        // have been migrated successfully.
×
145
        validateBatch := func() error {
×
146
                if len(batch) == 0 {
×
147
                        return nil
×
148
                }
×
149

150
                // Extract DB node IDs.
151
                dbIDs := make([]int64, 0, len(batch))
×
152
                for dbID := range batch {
×
153
                        dbIDs = append(dbIDs, dbID)
×
154
                }
×
155

156
                // Batch fetch all nodes from the database.
157
                dbNodes, err := sqlDB.GetNodesByIDs(ctx, dbIDs)
×
158
                if err != nil {
×
159
                        return fmt.Errorf("could not batch fetch nodes: %w",
×
160
                                err)
×
161
                }
×
162

163
                // Make sure that the number of nodes fetched matches the number
164
                // of nodes in the batch.
165
                if len(dbNodes) != len(batch) {
×
166
                        return fmt.Errorf("expected to fetch %d nodes, "+
×
167
                                "but got %d", len(batch), len(dbNodes))
×
168
                }
×
169

170
                // Now, batch fetch the normalised data for all the nodes in
171
                // the batch.
172
                batchData, err := batchLoadNodeData(ctx, cfg, sqlDB, dbIDs)
×
173
                if err != nil {
×
174
                        return fmt.Errorf("unable to batch load node data: %w",
×
175
                                err)
×
176
                }
×
177

178
                for _, dbNode := range dbNodes {
×
179
                        // Get the KVDB node info from the batch map.
×
180
                        node, ok := batch[dbNode.ID]
×
181
                        if !ok {
×
182
                                return fmt.Errorf("node with ID %d not found "+
×
183
                                        "in batch", dbNode.ID)
×
184
                        }
×
185

186
                        // Build the migrated node from the DB node and the
187
                        // batch node data.
188
                        migNode, err := buildNodeWithBatchData(
×
189
                                dbNode, batchData,
×
190
                        )
×
191
                        if err != nil {
×
192
                                return fmt.Errorf("could not build migrated "+
×
193
                                        "node from dbNode(db id: %d, node "+
×
194
                                        "pub: %x): %w", dbNode.ID,
×
195
                                        node.PubKeyBytes, err)
×
196
                        }
×
197

198
                        // Make sure that the node addresses are sorted before
199
                        // comparing them to ensure that the order of addresses
200
                        // does not affect the comparison.
201
                        slices.SortFunc(
×
202
                                node.Addresses, func(i, j net.Addr) int {
×
203
                                        return cmp.Compare(
×
204
                                                i.String(), j.String(),
×
205
                                        )
×
206
                                },
×
207
                        )
208
                        slices.SortFunc(
×
209
                                migNode.Addresses, func(i, j net.Addr) int {
×
210
                                        return cmp.Compare(
×
211
                                                i.String(), j.String(),
×
212
                                        )
×
213
                                },
×
214
                        )
215

216
                        err = sqldb.CompareRecords(
×
217
                                node, migNode,
×
218
                                fmt.Sprintf("node %x", node.PubKeyBytes),
×
219
                        )
×
220
                        if err != nil {
×
221
                                return fmt.Errorf("node mismatch after "+
×
222
                                        "migration for node %x: %w",
×
223
                                        node.PubKeyBytes, err)
×
224
                        }
×
225
                }
226

227
                // Clear the batch map for the next iteration.
228
                batch = make(
×
229
                        map[int64]*models.Node, cfg.MaxBatchSize,
×
230
                )
×
231

×
232
                return nil
×
233
        }
234

235
        // Loop through each node in the KV store and insert it into the SQL
236
        // database.
237
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
238
                node *models.Node) error {
×
239

×
240
                pub := node.PubKeyBytes
×
241

×
242
                // Sanity check to ensure that the node has valid extra opaque
×
243
                // data. If it does not, we'll skip it. We need to do this
×
244
                // because previously we would just persist any TLV bytes that
×
245
                // we received without validating them. Now, however, we
×
246
                // normalise the storage of extra opaque data, so we need to
×
247
                // ensure that the data is valid. We don't want to abort the
×
248
                // migration if we encounter a node with invalid extra opaque
×
249
                // data, so we'll just skip it and log a warning.
×
250
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
251
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
252
                        skipped++
×
253
                        log.Warnf("Skipping migration of node %x with invalid "+
×
254
                                "extra opaque data: %v", pub,
×
255
                                node.ExtraOpaqueData)
×
256

×
257
                        return nil
×
258
                } else if err != nil {
×
259
                        return fmt.Errorf("unable to marshal extra "+
×
260
                                "opaque data for node %x: %w", pub, err)
×
261
                }
×
262

263
                if err = maybeOverrideNodeAddresses(node); err != nil {
×
264
                        skipped++
×
265
                        log.Warnf("Skipping migration of node %x with invalid "+
×
266
                                "address (%v): %v", pub, node.Addresses, err)
×
267

×
268
                        return nil
×
269
                }
×
270

271
                count++
×
272
                chunk++
×
273

×
274
                // Write the node to the SQL database.
×
275
                id, err := insertNodeSQLMig(ctx, sqlDB, node)
×
276
                if err != nil {
×
277
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
278
                                err)
×
279
                }
×
280

281
                // Add to validation batch.
282
                batch[id] = node
×
283

×
284
                // Validate batch when full.
×
285
                if len(batch) >= int(cfg.MaxBatchSize) {
×
286
                        err := validateBatch()
×
287
                        if err != nil {
×
288
                                return fmt.Errorf("batch validation failed: %w",
×
289
                                        err)
×
290
                        }
×
291
                }
292

293
                s.Do(func() {
×
294
                        elapsed := time.Since(t0).Seconds()
×
295
                        ratePerSec := float64(chunk) / elapsed
×
296
                        log.Debugf("Migrated %d nodes (%.2f nodes/sec)",
×
297
                                count, ratePerSec)
×
298

×
299
                        t0 = time.Now()
×
300
                        chunk = 0
×
301
                })
×
302

303
                return nil
×
304
        }, func() {
×
305
                count = 0
×
306
                chunk = 0
×
307
                skipped = 0
×
308
                t0 = time.Now()
×
309
                batch = make(map[int64]*models.Node, cfg.MaxBatchSize)
×
310
        })
×
311
        if err != nil {
×
312
                return fmt.Errorf("could not migrate nodes: %w", err)
×
313
        }
×
314

315
        // Validate any remaining nodes in the batch.
316
        if len(batch) > 0 {
×
317
                err := validateBatch()
×
318
                if err != nil {
×
319
                        return fmt.Errorf("final batch validation failed: %w",
×
320
                                err)
×
321
                }
×
322
        }
323

324
        log.Infof("Migrated %d nodes from KV to SQL in %v (skipped %d nodes "+
×
325
                "due to invalid TLV streams or invalid addresses)", count,
×
326
                time.Since(totalTime),
×
327

×
328
                skipped)
×
329

×
330
        return nil
×
331
}
332

333
// maybeOverrideNodeAddresses checks if the node has any opaque addresses that
334
// can be parsed. If so, it replaces the node's addresses with the parsed
335
// addresses. If the address is unparseable, it returns an error.
336
func maybeOverrideNodeAddresses(node *models.Node) error {
×
337
        // In the majority of cases, the number of node addresses will remain
×
338
        // unchanged, so we pre-allocate a slice of the same length.
×
339
        addrs := make([]net.Addr, 0, len(node.Addresses))
×
340

×
341
        // Iterate over each address in search of any opaque addresses that we
×
342
        // can inspect.
×
343
        for _, addr := range node.Addresses {
×
344
                opaque, ok := addr.(*lnwire.OpaqueAddrs)
×
345
                if !ok {
×
346
                        // Any non-opaque address is left unchanged.
×
347
                        addrs = append(addrs, addr)
×
348
                        continue
×
349
                }
350

351
                // For each opaque address, we'll now attempt to parse out any
352
                // known addresses. We'll do this in a loop, as it's possible
353
                // that there are several addresses encoded in a single opaque
354
                // address.
355
                payload := opaque.Payload
×
356
                for len(payload) > 0 {
×
357
                        var (
×
358
                                r            = bytes.NewReader(payload)
×
359
                                numAddrBytes = uint16(len(payload))
×
360
                        )
×
361
                        byteRead, readAddr, err := lnwire.ReadAddress(
×
362
                                r, numAddrBytes,
×
363
                        )
×
364
                        if err != nil {
×
365
                                return err
×
366
                        }
×
367

368
                        // If we were able to read an address, we'll add it to
369
                        // our list of addresses.
370
                        if readAddr != nil {
×
371
                                addrs = append(addrs, readAddr)
×
372
                        }
×
373

374
                        // If the address we read was an opaque address, it
375
                        // means we've hit an unknown address type, and it has
376
                        // consumed the rest of the payload. We can break out
377
                        // of the loop.
378
                        if _, ok := readAddr.(*lnwire.OpaqueAddrs); ok {
×
379
                                break
×
380
                        }
381

382
                        // If we've read all the bytes, we can also break.
383
                        if byteRead >= numAddrBytes {
×
384
                                break
×
385
                        }
386

387
                        // Otherwise, we'll advance our payload slice and
388
                        // continue.
389
                        payload = payload[byteRead:]
×
390
                }
391
        }
392

393
        // Override the node addresses if we have any.
394
        if len(addrs) != 0 {
×
395
                node.Addresses = addrs
×
396
        }
×
397

398
        return nil
×
399
}
400

401
// migrateSourceNode migrates the source node from the KV backend to the
402
// SQL database.
403
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
404
        sqlDB SQLQueries) error {
×
405

×
406
        log.Debugf("Migrating source node from KV to SQL")
×
407

×
408
        sourceNode, err := sourceNode(kvdb)
×
409
        if errors.Is(err, ErrSourceNodeNotSet) {
×
410
                // If the source node has not been set yet, we can skip this
×
411
                // migration step.
×
412
                return nil
×
413
        } else if err != nil {
×
414
                return fmt.Errorf("could not get source node from kv "+
×
415
                        "store: %w", err)
×
416
        }
×
417

418
        pub := sourceNode.PubKeyBytes
×
419

×
420
        // Get the DB ID of the source node by its public key. This node must
×
421
        // already exist in the SQL database, as it should have been migrated
×
422
        // in the previous node-migration step.
×
423
        id, err := sqlDB.GetNodeIDByPubKey(
×
424
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
425
                        PubKey:  pub[:],
×
NEW
426
                        Version: int16(lnwire.GossipVersion1),
×
427
                },
×
428
        )
×
429
        if err != nil {
×
430
                return fmt.Errorf("could not get source node ID: %w", err)
×
431
        }
×
432

433
        // Now we can add the source node to the SQL database.
434
        err = sqlDB.AddSourceNode(ctx, id)
×
435
        if err != nil {
×
436
                return fmt.Errorf("could not add source node to SQL store: %w",
×
437
                        err)
×
438
        }
×
439

440
        // Verify that the source node was added correctly by fetching it back
441
        // from the SQL database and checking that the expected DB ID and
442
        // pub key are returned. We don't need to do a whole node comparison
443
        // here, as this was already done in the previous migration step.
NEW
444
        srcNodes, err := sqlDB.GetSourceNodesByVersion(
×
NEW
445
                ctx, int16(lnwire.GossipVersion1),
×
NEW
446
        )
×
447
        if err != nil {
×
448
                return fmt.Errorf("could not get source nodes from SQL "+
×
449
                        "store: %w", err)
×
450
        }
×
451

452
        // The SQL store has support for multiple source nodes (for future
453
        // protocol versions) but this migration is purely aimed at the V1
454
        // store, and so we expect exactly one source node to be present.
455
        if len(srcNodes) != 1 {
×
456
                return fmt.Errorf("expected exactly one source node, "+
×
457
                        "got %d", len(srcNodes))
×
458
        }
×
459

460
        // Check that the source node ID and pub key match the original
461
        // source node.
462
        if srcNodes[0].NodeID != id {
×
463
                return fmt.Errorf("source node ID mismatch after migration: "+
×
464
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
465
        }
×
466
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
467
        if err != nil {
×
468
                return fmt.Errorf("source node pubkey mismatch after "+
×
469
                        "migration: %w", err)
×
470
        }
×
471

472
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
473

×
474
        return nil
×
475
}
476

477
// migChanInfo holds the information about a channel and its policies.
478
type migChanInfo struct {
479
        // edge is the channel object as read from the KVDB source.
480
        edge *models.ChannelEdgeInfo
481

482
        // policy1 is the first channel policy for the channel as read from
483
        // the KVDB source.
484
        policy1 *models.ChannelEdgePolicy
485

486
        // policy2 is the second channel policy for the channel as read
487
        // from the KVDB source.
488
        policy2 *models.ChannelEdgePolicy
489

490
        // dbInfo holds location info (in the form of DB IDs) of the channel
491
        // and its policies in the native-SQL destination.
492
        dbInfo *dbChanInfo
493
}
494

495
// migrateChannelsAndPolicies migrates all channels and their policies
496
// from the KV backend to the SQL database.
497
func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
498
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
499

×
500
        var (
×
501
                totalTime = time.Now()
×
502

×
503
                channelCount       uint64
×
504
                skippedChanCount   uint64
×
505
                policyCount        uint64
×
506
                skippedPolicyCount uint64
×
507

×
508
                t0    = time.Now()
×
509
                chunk uint64
×
510
                s     = rate.Sometimes{
×
511
                        Interval: 10 * time.Second,
×
512
                }
×
513
        )
×
514
        migChanPolicy := func(dbChanInfo *dbChanInfo,
×
515
                policy *models.ChannelEdgePolicy) error {
×
516

×
517
                // If the policy is nil, we can skip it.
×
518
                if policy == nil {
×
519
                        return nil
×
520
                }
×
521

522
                // Unlike the special case of invalid TLV bytes for node and
523
                // channel announcements, we don't need to handle the case for
524
                // channel policies here because it is already handled in the
525
                // `forEachChannel` function. If the policy has invalid TLV
526
                // bytes, then `nil` will be passed to this function.
527

528
                policyCount++
×
529

×
530
                err := insertChanEdgePolicyMig(ctx, sqlDB, dbChanInfo, policy)
×
531
                if err != nil {
×
532
                        return fmt.Errorf("could not migrate channel "+
×
533
                                "policy %d: %w", policy.ChannelID, err)
×
534
                }
×
535

536
                return nil
×
537
        }
538

539
        // batch is used to collect migrated channel info that we will
540
        // batch-validate. Each entry is indexed by the DB ID of the channel
541
        // in the SQL database.
542
        batch := make(map[int64]*migChanInfo, cfg.QueryCfg.MaxBatchSize)
×
543

×
544
        // Iterate over each channel in the KV store and migrate it and its
×
545
        // policies to the SQL database.
×
546
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
547
                policy1 *models.ChannelEdgePolicy,
×
548
                policy2 *models.ChannelEdgePolicy) error {
×
549

×
550
                scid := channel.ChannelID
×
551

×
552
                // Here, we do a sanity check to ensure that the chain hash of
×
553
                // the channel returned by the KV store matches the expected
×
554
                // chain hash. This is important since in the SQL store, we will
×
555
                // no longer explicitly store the chain hash in the channel
×
556
                // info, but rather rely on the chain hash LND is running with.
×
557
                // So this is our way of ensuring that LND is running on the
×
558
                // correct network at migration time.
×
559
                if channel.ChainHash != cfg.ChainHash {
×
560
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
561
                                "expected %s", scid, channel.ChainHash,
×
562
                                cfg.ChainHash)
×
563
                }
×
564

565
                // Sanity check to ensure that the channel has valid extra
566
                // opaque data. If it does not, we'll skip it. We need to do
567
                // this because previously we would just persist any TLV bytes
568
                // that we received without validating them. Now, however, we
569
                // normalise the storage of extra opaque data, so we need to
570
                // ensure that the data is valid. We don't want to abort the
571
                // migration if we encounter a channel with invalid extra opaque
572
                // data, so we'll just skip it and log a warning.
573
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
574
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
575
                        log.Warnf("Skipping channel %d with invalid "+
×
576
                                "extra opaque data: %v", scid,
×
577
                                channel.ExtraOpaqueData)
×
578

×
579
                        skippedChanCount++
×
580

×
581
                        // If we skip a channel, we also skip its policies.
×
582
                        if policy1 != nil {
×
583
                                skippedPolicyCount++
×
584
                        }
×
585
                        if policy2 != nil {
×
586
                                skippedPolicyCount++
×
587
                        }
×
588

589
                        return nil
×
590
                } else if err != nil {
×
591
                        return fmt.Errorf("unable to marshal extra opaque "+
×
592
                                "data for channel %d (%v): %w", scid,
×
593
                                channel.ExtraOpaqueData, err)
×
594
                }
×
595

596
                channelCount++
×
597
                chunk++
×
598

×
599
                // Migrate the channel info along with its policies.
×
600
                dbChanInfo, err := insertChannelMig(ctx, sqlDB, channel)
×
601
                if err != nil {
×
602
                        return fmt.Errorf("could not insert record for "+
×
603
                                "channel %d in SQL store: %w", scid, err)
×
604
                }
×
605

606
                // Now, migrate the two channel policies for the channel.
607
                err = migChanPolicy(dbChanInfo, policy1)
×
608
                if err != nil {
×
609
                        return fmt.Errorf("could not migrate policy1(%d): %w",
×
610
                                scid, err)
×
611
                }
×
612
                err = migChanPolicy(dbChanInfo, policy2)
×
613
                if err != nil {
×
614
                        return fmt.Errorf("could not migrate policy2(%d): %w",
×
615
                                scid, err)
×
616
                }
×
617

618
                // Collect the migrated channel info and policies in a batch for
619
                // later validation.
620
                batch[dbChanInfo.channelID] = &migChanInfo{
×
621
                        edge:    channel,
×
622
                        policy1: policy1,
×
623
                        policy2: policy2,
×
624
                        dbInfo:  dbChanInfo,
×
625
                }
×
626

×
627
                if len(batch) >= int(cfg.QueryCfg.MaxBatchSize) {
×
628
                        // Do batch validation.
×
629
                        err := validateMigratedChannels(ctx, cfg, sqlDB, batch)
×
630
                        if err != nil {
×
631
                                return fmt.Errorf("could not validate "+
×
632
                                        "channel batch: %w", err)
×
633
                        }
×
634

635
                        batch = make(
×
636
                                map[int64]*migChanInfo,
×
637
                                cfg.QueryCfg.MaxBatchSize,
×
638
                        )
×
639
                }
640

641
                s.Do(func() {
×
642
                        elapsed := time.Since(t0).Seconds()
×
643
                        ratePerSec := float64(chunk) / elapsed
×
644
                        log.Debugf("Migrated %d channels (%.2f channels/sec)",
×
645
                                channelCount, ratePerSec)
×
646

×
647
                        t0 = time.Now()
×
648
                        chunk = 0
×
649
                })
×
650

651
                return nil
×
652
        }, func() {
×
653
                channelCount = 0
×
654
                policyCount = 0
×
655
                chunk = 0
×
656
                skippedChanCount = 0
×
657
                skippedPolicyCount = 0
×
658
                t0 = time.Now()
×
659
                batch = make(map[int64]*migChanInfo, cfg.QueryCfg.MaxBatchSize)
×
660
        })
×
661
        if err != nil {
×
662
                return fmt.Errorf("could not migrate channels and policies: %w",
×
663
                        err)
×
664
        }
×
665

666
        if len(batch) > 0 {
×
667
                // Do a final batch validation for any remaining channels.
×
668
                err := validateMigratedChannels(ctx, cfg, sqlDB, batch)
×
669
                if err != nil {
×
670
                        return fmt.Errorf("could not validate final channel "+
×
671
                                "batch: %w", err)
×
672
                }
×
673

674
                batch = make(map[int64]*migChanInfo, cfg.QueryCfg.MaxBatchSize)
×
675
        }
676

677
        log.Infof("Migrated %d channels and %d policies from KV to SQL in %s"+
×
678
                "(skipped %d channels and %d policies due to invalid TLV "+
×
679
                "streams)", channelCount, policyCount, time.Since(totalTime),
×
680
                skippedChanCount, skippedPolicyCount)
×
681

×
682
        return nil
×
683
}
684

685
// validateMigratedChannels validates the channels in the batch after they have
686
// been migrated to the SQL database. It batch fetches all channels by their IDs
687
// and compares the migrated channels and their policies with the original ones
688
// to ensure they match using batch construction patterns.
689
func validateMigratedChannels(ctx context.Context, cfg *SQLStoreConfig,
690
        sqlDB SQLQueries, batch map[int64]*migChanInfo) error {
×
691

×
692
        // Convert batch keys (DB IDs) to an int slice for the batch query.
×
693
        dbChanIDs := make([]int64, 0, len(batch))
×
694
        for id := range batch {
×
695
                dbChanIDs = append(dbChanIDs, id)
×
696
        }
×
697

698
        // Batch fetch all channels with their policies.
699
        rows, err := sqlDB.GetChannelsByIDs(ctx, dbChanIDs)
×
700
        if err != nil {
×
701
                return fmt.Errorf("could not batch get channels by IDs: %w",
×
702
                        err)
×
703
        }
×
704

705
        // Sanity check that the same number of channels were returned
706
        // as requested.
707
        if len(rows) != len(dbChanIDs) {
×
708
                return fmt.Errorf("expected to fetch %d channels, "+
×
709
                        "but got %d", len(dbChanIDs), len(rows))
×
710
        }
×
711

712
        // Collect all policy IDs needed for batch data loading.
713
        dbPolicyIDs := make([]int64, 0, len(dbChanIDs)*2)
×
714

×
715
        for _, row := range rows {
×
716
                scid := byteOrder.Uint64(row.GraphChannel.Scid)
×
717

×
718
                dbPol1, dbPol2, err := extractChannelPolicies(row)
×
719
                if err != nil {
×
720
                        return fmt.Errorf("could not extract channel policies"+
×
721
                                " for SCID %d: %w", scid, err)
×
722
                }
×
723
                if dbPol1 != nil {
×
724
                        dbPolicyIDs = append(dbPolicyIDs, dbPol1.ID)
×
725
                }
×
726
                if dbPol2 != nil {
×
727
                        dbPolicyIDs = append(dbPolicyIDs, dbPol2.ID)
×
728
                }
×
729
        }
730

731
        // Batch load all channel and policy data (features, extras).
732
        batchData, err := batchLoadChannelData(
×
733
                ctx, cfg.QueryCfg, sqlDB, dbChanIDs, dbPolicyIDs,
×
734
        )
×
735
        if err != nil {
×
736
                return fmt.Errorf("could not batch load channel and policy "+
×
737
                        "data: %w", err)
×
738
        }
×
739

740
        // Validate each channel in the batch using pre-loaded data.
741
        for _, row := range rows {
×
742
                kvdbChan, ok := batch[row.GraphChannel.ID]
×
743
                if !ok {
×
744
                        return fmt.Errorf("channel with ID %d not found "+
×
745
                                "in batch", row.GraphChannel.ID)
×
746
                }
×
747

748
                scid := byteOrder.Uint64(row.GraphChannel.Scid)
×
749

×
750
                err = validateMigratedChannelWithBatchData(
×
751
                        cfg, scid, kvdbChan, row, batchData,
×
752
                )
×
753
                if err != nil {
×
754
                        return fmt.Errorf("channel %d validation failed "+
×
755
                                "after migration: %w", scid, err)
×
756
                }
×
757
        }
758

759
        return nil
×
760
}
761

762
// validateMigratedChannelWithBatchData validates a single migrated channel
763
// using pre-fetched batch data for optimal performance.
764
func validateMigratedChannelWithBatchData(cfg *SQLStoreConfig,
765
        scid uint64, info *migChanInfo, row sqlc.GetChannelsByIDsRow,
766
        batchData *batchChannelData) error {
×
767

×
768
        dbChanInfo := info.dbInfo
×
769
        channel := info.edge
×
770

×
771
        // Assert that the DB IDs for the channel and nodes are as expected
×
772
        // given the inserted channel info.
×
773
        err := sqldb.CompareRecords(
×
774
                dbChanInfo.channelID, row.GraphChannel.ID, "channel DB ID",
×
775
        )
×
776
        if err != nil {
×
777
                return err
×
778
        }
×
779
        err = sqldb.CompareRecords(
×
780
                dbChanInfo.node1ID, row.Node1ID, "node1 DB ID",
×
781
        )
×
782
        if err != nil {
×
783
                return err
×
784
        }
×
785
        err = sqldb.CompareRecords(
×
786
                dbChanInfo.node2ID, row.Node2ID, "node2 DB ID",
×
787
        )
×
788
        if err != nil {
×
789
                return err
×
790
        }
×
791

792
        // Build node vertices from the row data.
793
        node1, node2, err := buildNodeVertices(
×
794
                row.Node1PubKey, row.Node2PubKey,
×
795
        )
×
796
        if err != nil {
×
797
                return err
×
798
        }
×
799

800
        // Build channel info using batch data.
801
        migChan, err := buildEdgeInfoWithBatchData(
×
802
                cfg.ChainHash, row.GraphChannel, node1, node2, batchData,
×
803
        )
×
804
        if err != nil {
×
805
                return fmt.Errorf("could not build migrated channel info: %w",
×
806
                        err)
×
807
        }
×
808

809
        // Extract channel policies from the row.
810
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
811
        if err != nil {
×
812
                return fmt.Errorf("could not extract channel policies: %w", err)
×
813
        }
×
814

815
        // Build channel policies using batch data.
816
        migPol1, migPol2, err := buildChanPoliciesWithBatchData(
×
817
                dbPol1, dbPol2, scid, node1, node2, batchData,
×
818
        )
×
819
        if err != nil {
×
820
                return fmt.Errorf("could not build migrated channel "+
×
821
                        "policies: %w", err)
×
822
        }
×
823

824
        // Finally, compare the original channel info and
825
        // policies with the migrated ones to ensure they match.
826
        if len(channel.ExtraOpaqueData) == 0 {
×
827
                channel.ExtraOpaqueData = nil
×
828
        }
×
829
        if len(migChan.ExtraOpaqueData) == 0 {
×
830
                migChan.ExtraOpaqueData = nil
×
831
        }
×
832

833
        err = sqldb.CompareRecords(
×
834
                channel, migChan, fmt.Sprintf("channel %d", scid),
×
835
        )
×
836
        if err != nil {
×
837
                return err
×
838
        }
×
839

840
        checkPolicy := func(expPolicy,
×
841
                migPolicy *models.ChannelEdgePolicy) error {
×
842

×
843
                switch {
×
844
                // Both policies are nil, nothing to compare.
845
                case expPolicy == nil && migPolicy == nil:
×
846
                        return nil
×
847

848
                // One of the policies is nil, but the other is not.
849
                case expPolicy == nil || migPolicy == nil:
×
850
                        return fmt.Errorf("expected both policies to be "+
×
851
                                "non-nil. Got expPolicy: %v, "+
×
852
                                "migPolicy: %v", expPolicy, migPolicy)
×
853

854
                // Both policies are non-nil, we can compare them.
855
                default:
×
856
                }
857

858
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
859
                        expPolicy.ExtraOpaqueData = nil
×
860
                }
×
861
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
862
                        migPolicy.ExtraOpaqueData = nil
×
863
                }
×
864

865
                return sqldb.CompareRecords(
×
866
                        *expPolicy, *migPolicy, "channel policy",
×
867
                )
×
868
        }
869

870
        err = checkPolicy(info.policy1, migPol1)
×
871
        if err != nil {
×
872
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
873
                        err)
×
874
        }
×
875

876
        err = checkPolicy(info.policy2, migPol2)
×
877
        if err != nil {
×
878
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
879
                        err)
×
880
        }
×
881

882
        return nil
×
883
}
884

885
// migratePruneLog migrates the prune log from the KV backend to the SQL
886
// database. It collects entries in batches, inserts them individually, and then
887
// validates them in batches using GetPruneEntriesForHeights for better i
888
// performance.
889
func migratePruneLog(ctx context.Context, cfg *sqldb.QueryConfig,
890
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
891

×
892
        var (
×
893
                totalTime = time.Now()
×
894

×
895
                count          uint64
×
896
                pruneTipHeight uint32
×
897
                pruneTipHash   chainhash.Hash
×
898

×
899
                t0    = time.Now()
×
900
                chunk uint64
×
901
                s     = rate.Sometimes{
×
902
                        Interval: 10 * time.Second,
×
903
                }
×
904
        )
×
905

×
906
        batch := make(map[uint32]chainhash.Hash, cfg.MaxBatchSize)
×
907

×
908
        // validateBatch validates a batch of prune entries using batch query.
×
909
        validateBatch := func() error {
×
910
                if len(batch) == 0 {
×
911
                        return nil
×
912
                }
×
913

914
                // Extract heights for the batch query.
915
                heights := make([]int64, 0, len(batch))
×
916
                for height := range batch {
×
917
                        heights = append(heights, int64(height))
×
918
                }
×
919

920
                // Batch fetch all entries from the database.
921
                rows, err := sqlDB.GetPruneEntriesForHeights(ctx, heights)
×
922
                if err != nil {
×
923
                        return fmt.Errorf("could not batch get prune "+
×
924
                                "entries: %w", err)
×
925
                }
×
926

927
                if len(rows) != len(batch) {
×
928
                        return fmt.Errorf("expected to fetch %d prune "+
×
929
                                "entries, but got %d", len(batch),
×
930
                                len(rows))
×
931
                }
×
932

933
                // Validate each entry in the batch.
934
                for _, row := range rows {
×
935
                        kvdbHash, ok := batch[uint32(row.BlockHeight)]
×
936
                        if !ok {
×
937
                                return fmt.Errorf("prune entry for height %d "+
×
938
                                        "not found in batch", row.BlockHeight)
×
939
                        }
×
940

941
                        err := sqldb.CompareRecords(
×
942
                                kvdbHash[:], row.BlockHash,
×
943
                                fmt.Sprintf("prune log entry at height %d",
×
944
                                        row.BlockHash),
×
945
                        )
×
946
                        if err != nil {
×
947
                                return err
×
948
                        }
×
949
                }
950

951
                // Reset the batch map for the next iteration.
952
                batch = make(map[uint32]chainhash.Hash, cfg.MaxBatchSize)
×
953

×
954
                return nil
×
955
        }
956

957
        // Iterate over each prune log entry in the KV store and migrate it to
958
        // the SQL database.
959
        err := forEachPruneLogEntry(
×
960
                kvBackend, func(height uint32, hash *chainhash.Hash) error {
×
961
                        count++
×
962
                        chunk++
×
963

×
964
                        // Keep track of the prune tip height and hash.
×
965
                        if height > pruneTipHeight {
×
966
                                pruneTipHeight = height
×
967
                                pruneTipHash = *hash
×
968
                        }
×
969

970
                        // Insert the entry (individual inserts for now).
971
                        err := sqlDB.UpsertPruneLogEntry(
×
972
                                ctx, sqlc.UpsertPruneLogEntryParams{
×
973
                                        BlockHeight: int64(height),
×
974
                                        BlockHash:   hash[:],
×
975
                                },
×
976
                        )
×
977
                        if err != nil {
×
978
                                return fmt.Errorf("unable to insert prune log "+
×
979
                                        "entry for height %d: %w", height, err)
×
980
                        }
×
981

982
                        // Add to validation batch.
983
                        batch[height] = *hash
×
984

×
985
                        // Validate batch when full.
×
986
                        if len(batch) >= int(cfg.MaxBatchSize) {
×
987
                                err := validateBatch()
×
988
                                if err != nil {
×
989
                                        return fmt.Errorf("batch "+
×
990
                                                "validation failed: %w", err)
×
991
                                }
×
992
                        }
993

994
                        s.Do(func() {
×
995
                                elapsed := time.Since(t0).Seconds()
×
996
                                ratePerSec := float64(chunk) / elapsed
×
997
                                log.Debugf("Migrated %d prune log "+
×
998
                                        "entries (%.2f entries/sec)",
×
999
                                        count, ratePerSec)
×
1000

×
1001
                                t0 = time.Now()
×
1002
                                chunk = 0
×
1003
                        })
×
1004

1005
                        return nil
×
1006
                },
1007
                func() {
×
1008
                        count = 0
×
1009
                        chunk = 0
×
1010
                        t0 = time.Now()
×
1011
                        batch = make(
×
1012
                                map[uint32]chainhash.Hash, cfg.MaxBatchSize,
×
1013
                        )
×
1014
                },
×
1015
        )
1016
        if err != nil {
×
1017
                return fmt.Errorf("could not migrate prune log: %w", err)
×
1018
        }
×
1019

1020
        // Validate any remaining entries in the batch.
1021
        if len(batch) > 0 {
×
1022
                err := validateBatch()
×
1023
                if err != nil {
×
1024
                        return fmt.Errorf("final batch validation failed: %w",
×
1025
                                err)
×
1026
                }
×
1027
        }
1028

1029
        // Check that the prune tip is set correctly in the SQL
1030
        // database.
1031
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
1032
        if errors.Is(err, sql.ErrNoRows) {
×
1033
                // The ErrGraphNeverPruned error is expected if no prune log
×
1034
                // entries were migrated from the kvdb store. Otherwise, it's
×
1035
                // an unexpected error.
×
1036
                if count == 0 {
×
1037
                        log.Infof("No prune log entries found in KV store " +
×
1038
                                "to migrate")
×
1039
                        return nil
×
1040
                }
×
1041
                // Fall-through to the next error check.
1042
        }
1043
        if err != nil {
×
1044
                return fmt.Errorf("could not get prune tip: %w", err)
×
1045
        }
×
1046

1047
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
1048
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
1049

×
1050
                return fmt.Errorf("prune tip mismatch after migration: "+
×
1051
                        "expected height %d, hash %s; got height %d, "+
×
1052
                        "hash %s", pruneTipHeight, pruneTipHash,
×
1053
                        pruneTip.BlockHeight,
×
1054
                        chainhash.Hash(pruneTip.BlockHash))
×
1055
        }
×
1056

1057
        log.Infof("Migrated %d prune log entries from KV to SQL in %s. "+
×
1058
                "The prune tip is: height %d, hash: %s", count,
×
1059
                time.Since(totalTime), pruneTipHeight, pruneTipHash)
×
1060

×
1061
        return nil
×
1062
}
1063

1064
// forEachPruneLogEntry iterates over each prune log entry in the KV
1065
// backend and calls the provided callback function for each entry.
1066
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
1067
        hash *chainhash.Hash) error, reset func()) error {
×
1068

×
1069
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
1070
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
1071
                if metaBucket == nil {
×
1072
                        return ErrGraphNotFound
×
1073
                }
×
1074

1075
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
1076
                if pruneBucket == nil {
×
1077
                        // The graph has never been pruned and so, there are no
×
1078
                        // entries to iterate over.
×
1079
                        return nil
×
1080
                }
×
1081

1082
                return pruneBucket.ForEach(func(k, v []byte) error {
×
1083
                        blockHeight := byteOrder.Uint32(k)
×
1084
                        var blockHash chainhash.Hash
×
1085
                        copy(blockHash[:], v)
×
1086

×
1087
                        return cb(blockHeight, &blockHash)
×
1088
                })
×
1089
        }, reset)
1090
}
1091

1092
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
1093
// the SQL database. It collects SCIDs in batches, inserts them individually,
1094
// and then validates them in batches using GetClosedChannelsSCIDs for better
1095
// performance.
1096
func migrateClosedSCIDIndex(ctx context.Context, cfg *sqldb.QueryConfig,
1097
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
1098

×
1099
        var (
×
1100
                totalTime = time.Now()
×
1101

×
1102
                count uint64
×
1103

×
1104
                t0    = time.Now()
×
1105
                chunk uint64
×
1106
                s     = rate.Sometimes{
×
1107
                        Interval: 10 * time.Second,
×
1108
                }
×
1109
        )
×
1110

×
1111
        batch := make([][]byte, 0, cfg.MaxBatchSize)
×
1112

×
1113
        // validateBatch validates a batch of closed SCIDs using batch query.
×
1114
        validateBatch := func() error {
×
1115
                if len(batch) == 0 {
×
1116
                        return nil
×
1117
                }
×
1118

1119
                // Batch fetch all closed SCIDs from the database.
1120
                dbSCIDs, err := sqlDB.GetClosedChannelsSCIDs(ctx, batch)
×
1121
                if err != nil {
×
1122
                        return fmt.Errorf("could not batch get closed "+
×
1123
                                "SCIDs: %w", err)
×
1124
                }
×
1125

1126
                // Create set of SCIDs that exist in the database for quick
1127
                // lookup.
1128
                dbSCIDSet := make(map[string]struct{})
×
1129
                for _, scid := range dbSCIDs {
×
1130
                        dbSCIDSet[string(scid)] = struct{}{}
×
1131
                }
×
1132

1133
                // Validate each SCID in the batch.
1134
                for _, expectedSCID := range batch {
×
1135
                        if _, found := dbSCIDSet[string(expectedSCID)]; !found {
×
1136
                                return fmt.Errorf("closed SCID %x not found "+
×
1137
                                        "in database", expectedSCID)
×
1138
                        }
×
1139
                }
1140

1141
                // Reset the batch for the next iteration.
1142
                batch = make([][]byte, 0, cfg.MaxBatchSize)
×
1143

×
1144
                return nil
×
1145
        }
1146

1147
        migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
×
1148
                count++
×
1149
                chunk++
×
1150

×
1151
                chanIDB := channelIDToBytes(scid.ToUint64())
×
1152
                err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
1153
                if err != nil {
×
1154
                        return fmt.Errorf("could not insert closed channel "+
×
1155
                                "with SCID %s: %w", scid, err)
×
1156
                }
×
1157

1158
                // Add to validation batch.
1159
                batch = append(batch, chanIDB)
×
1160

×
1161
                // Validate batch when full.
×
1162
                if len(batch) >= int(cfg.MaxBatchSize) {
×
1163
                        err := validateBatch()
×
1164
                        if err != nil {
×
1165
                                return fmt.Errorf("batch validation failed: %w",
×
1166
                                        err)
×
1167
                        }
×
1168
                }
1169

1170
                s.Do(func() {
×
1171
                        elapsed := time.Since(t0).Seconds()
×
1172
                        ratePerSec := float64(chunk) / elapsed
×
1173
                        log.Debugf("Migrated %d closed scids "+
×
1174
                                "(%.2f entries/sec)", count, ratePerSec)
×
1175

×
1176
                        t0 = time.Now()
×
1177
                        chunk = 0
×
1178
                })
×
1179

1180
                return nil
×
1181
        }
1182

1183
        err := forEachClosedSCID(
×
1184
                kvBackend, migrateSingleClosedSCID, func() {
×
1185
                        count = 0
×
1186
                        chunk = 0
×
1187
                        t0 = time.Now()
×
1188
                        batch = make([][]byte, 0, cfg.MaxBatchSize)
×
1189
                },
×
1190
        )
1191
        if err != nil {
×
1192
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
1193
                        err)
×
1194
        }
×
1195

1196
        // Validate any remaining SCIDs in the batch.
1197
        if len(batch) > 0 {
×
1198
                err := validateBatch()
×
1199
                if err != nil {
×
1200
                        return fmt.Errorf("final batch validation failed: %w",
×
1201
                                err)
×
1202
                }
×
1203
        }
1204

1205
        log.Infof("Migrated %d closed SCIDs from KV to SQL in %s", count,
×
1206
                time.Since(totalTime))
×
1207

×
1208
        return nil
×
1209
}
1210

1211
// migrateZombieIndex migrates the zombie index from the KV backend to the SQL
1212
// database. It collects zombie channels in batches, inserts them individually,
1213
// and validates them in batches.
1214
//
1215
// NOTE: before inserting an entry into the zombie index, the function checks
1216
// if the channel is already marked as closed in the SQL store. If it is,
1217
// the entry is skipped. This means that the resulting zombie index count in
1218
// the SQL store may well be less than the count of zombie channels in the KV
1219
// store.
1220
func migrateZombieIndex(ctx context.Context, cfg *sqldb.QueryConfig,
1221
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
1222

×
1223
        var (
×
1224
                totalTime = time.Now()
×
1225

×
1226
                count uint64
×
1227

×
1228
                t0    = time.Now()
×
1229
                chunk uint64
×
1230
                s     = rate.Sometimes{
×
1231
                        Interval: 10 * time.Second,
×
1232
                }
×
1233
        )
×
1234

×
1235
        type zombieEntry struct {
×
1236
                pub1 route.Vertex
×
1237
                pub2 route.Vertex
×
1238
        }
×
1239

×
1240
        batch := make(map[uint64]*zombieEntry, cfg.MaxBatchSize)
×
1241

×
1242
        // validateBatch validates a batch of zombie SCIDs using batch query.
×
1243
        validateBatch := func() error {
×
1244
                if len(batch) == 0 {
×
1245
                        return nil
×
1246
                }
×
1247

1248
                scids := make([][]byte, 0, len(batch))
×
1249
                for scid := range batch {
×
1250
                        scids = append(scids, channelIDToBytes(scid))
×
1251
                }
×
1252

1253
                // Batch fetch all zombie channels from the database.
1254
                rows, err := sqlDB.GetZombieChannelsSCIDs(
×
1255
                        ctx, sqlc.GetZombieChannelsSCIDsParams{
×
NEW
1256
                                Version: int16(lnwire.GossipVersion1),
×
1257
                                Scids:   scids,
×
1258
                        },
×
1259
                )
×
1260
                if err != nil {
×
1261
                        return fmt.Errorf("could not batch get zombie "+
×
1262
                                "SCIDs: %w", err)
×
1263
                }
×
1264

1265
                // Make sure that the number of rows returned matches
1266
                // the number of SCIDs we requested.
1267
                if len(rows) != len(scids) {
×
1268
                        return fmt.Errorf("expected to fetch %d zombie "+
×
1269
                                "SCIDs, but got %d", len(scids), len(rows))
×
1270
                }
×
1271

1272
                // Validate each row is in the batch.
1273
                for _, row := range rows {
×
1274
                        scid := byteOrder.Uint64(row.Scid)
×
1275

×
1276
                        kvdbZombie, ok := batch[scid]
×
1277
                        if !ok {
×
1278
                                return fmt.Errorf("zombie SCID %x not found "+
×
1279
                                        "in batch", scid)
×
1280
                        }
×
1281

1282
                        err = sqldb.CompareRecords(
×
1283
                                kvdbZombie.pub1[:], row.NodeKey1,
×
1284
                                fmt.Sprintf("zombie pub key 1 (%s) for "+
×
1285
                                        "channel %d", kvdbZombie.pub1, scid),
×
1286
                        )
×
1287
                        if err != nil {
×
1288
                                return err
×
1289
                        }
×
1290

1291
                        err = sqldb.CompareRecords(
×
1292
                                kvdbZombie.pub2[:], row.NodeKey2,
×
1293
                                fmt.Sprintf("zombie pub key 2 (%s) for "+
×
1294
                                        "channel %d", kvdbZombie.pub2, scid),
×
1295
                        )
×
1296
                        if err != nil {
×
1297
                                return err
×
1298
                        }
×
1299
                }
1300

1301
                // Reset the batch for the next iteration.
1302
                batch = make(map[uint64]*zombieEntry, cfg.MaxBatchSize)
×
1303

×
1304
                return nil
×
1305
        }
1306

1307
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
1308
                pubKey2 [33]byte) error {
×
1309

×
1310
                chanIDB := channelIDToBytes(chanID)
×
1311

×
1312
                // If it is in the closed SCID index, we don't need to
×
1313
                // add it to the zombie index.
×
1314
                //
×
1315
                // NOTE: this means that the resulting zombie index count in
×
1316
                // the SQL store may well be less than the count of zombie
×
1317
                // channels in the KV store.
×
1318
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
1319
                if err != nil {
×
1320
                        return fmt.Errorf("could not check closed "+
×
1321
                                "channel: %w", err)
×
1322
                }
×
1323
                if isClosed {
×
1324
                        return nil
×
1325
                }
×
1326

1327
                count++
×
1328
                chunk++
×
1329

×
1330
                err = sqlDB.UpsertZombieChannel(
×
1331
                        ctx, sqlc.UpsertZombieChannelParams{
×
NEW
1332
                                Version:  int16(lnwire.GossipVersion1),
×
1333
                                Scid:     chanIDB,
×
1334
                                NodeKey1: pubKey1[:],
×
1335
                                NodeKey2: pubKey2[:],
×
1336
                        },
×
1337
                )
×
1338
                if err != nil {
×
1339
                        return fmt.Errorf("could not upsert zombie "+
×
1340
                                "channel %d: %w", chanID, err)
×
1341
                }
×
1342

1343
                // Add to validation batch only after successful insertion.
1344
                batch[chanID] = &zombieEntry{
×
1345
                        pub1: pubKey1,
×
1346
                        pub2: pubKey2,
×
1347
                }
×
1348

×
1349
                // Validate batch when full.
×
1350
                if len(batch) >= int(cfg.MaxBatchSize) {
×
1351
                        err := validateBatch()
×
1352
                        if err != nil {
×
1353
                                return fmt.Errorf("batch validation failed: %w",
×
1354
                                        err)
×
1355
                        }
×
1356
                }
1357

1358
                s.Do(func() {
×
1359
                        elapsed := time.Since(t0).Seconds()
×
1360
                        ratePerSec := float64(chunk) / elapsed
×
1361
                        log.Debugf("Migrated %d zombie index entries "+
×
1362
                                "(%.2f entries/sec)", count, ratePerSec)
×
1363

×
1364
                        t0 = time.Now()
×
1365
                        chunk = 0
×
1366
                })
×
1367

1368
                return nil
×
1369
        }, func() {
×
1370
                count = 0
×
1371
                chunk = 0
×
1372
                t0 = time.Now()
×
1373
                batch = make(map[uint64]*zombieEntry, cfg.MaxBatchSize)
×
1374
        })
×
1375
        if err != nil {
×
1376
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
1377
        }
×
1378

1379
        // Validate any remaining zombie SCIDs in the batch.
1380
        if len(batch) > 0 {
×
1381
                err := validateBatch()
×
1382
                if err != nil {
×
1383
                        return fmt.Errorf("final batch validation failed: %w",
×
1384
                                err)
×
1385
                }
×
1386
        }
1387

1388
        log.Infof("Migrated %d zombie channels from KV to SQL in %s", count,
×
1389
                time.Since(totalTime))
×
1390

×
1391
        return nil
×
1392
}
1393

1394
// forEachZombieEntry iterates over each zombie channel entry in the
1395
// KV backend and calls the provided callback function for each entry.
1396
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
1397
        pubKey2 [33]byte) error, reset func()) error {
×
1398

×
1399
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
1400
                edges := tx.ReadBucket(edgeBucket)
×
1401
                if edges == nil {
×
1402
                        return ErrGraphNoEdgesFound
×
1403
                }
×
1404
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
1405
                if zombieIndex == nil {
×
1406
                        return nil
×
1407
                }
×
1408

1409
                return zombieIndex.ForEach(func(k, v []byte) error {
×
1410
                        var pubKey1, pubKey2 [33]byte
×
1411
                        copy(pubKey1[:], v[:33])
×
1412
                        copy(pubKey2[:], v[33:])
×
1413

×
1414
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
1415
                })
×
1416
        }, reset)
1417
}
1418

1419
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
1420
// the provided callback function for each SCID.
1421
func forEachClosedSCID(db kvdb.Backend,
1422
        cb func(lnwire.ShortChannelID) error, reset func()) error {
×
1423

×
1424
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
1425
                closedScids := tx.ReadBucket(closedScidBucket)
×
1426
                if closedScids == nil {
×
1427
                        return nil
×
1428
                }
×
1429

1430
                return closedScids.ForEach(func(k, _ []byte) error {
×
1431
                        return cb(lnwire.NewShortChanIDFromInt(
×
1432
                                byteOrder.Uint64(k),
×
1433
                        ))
×
1434
                })
×
1435
        }, reset)
1436
}
1437

1438
// insertNodeSQLMig inserts the node record into the database during the graph
1439
// SQL migration. No error is expected if the node already exists. Unlike the
1440
// main upsertNode function, this function does not require that a new node
1441
// update have a newer timestamp than the existing one. This is because we want
1442
// the migration to be idempotent and dont want to error out if we re-insert the
1443
// exact same node.
1444
func insertNodeSQLMig(ctx context.Context, db SQLQueries,
1445
        node *models.Node) (int64, error) {
×
1446

×
1447
        params := sqlc.InsertNodeMigParams{
×
NEW
1448
                Version: int16(lnwire.GossipVersion1),
×
1449
                PubKey:  node.PubKeyBytes[:],
×
1450
        }
×
1451

×
1452
        if node.HaveNodeAnnouncement {
×
1453
                params.LastUpdate = sqldb.SQLInt64(node.LastUpdate.Unix())
×
1454
                params.Color = sqldb.SQLStrValid(EncodeHexColor(node.Color))
×
1455
                params.Alias = sqldb.SQLStrValid(node.Alias)
×
1456
                params.Signature = node.AuthSigBytes
×
1457
        }
×
1458

1459
        nodeID, err := db.InsertNodeMig(ctx, params)
×
1460
        if err != nil {
×
1461
                return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes,
×
1462
                        err)
×
1463
        }
×
1464

1465
        // We can exit here if we don't have the announcement yet.
1466
        if !node.HaveNodeAnnouncement {
×
1467
                return nodeID, nil
×
1468
        }
×
1469

1470
        // Insert the node's features.
1471
        for feature := range node.Features.Features() {
×
1472
                err = db.InsertNodeFeature(ctx, sqlc.InsertNodeFeatureParams{
×
1473
                        NodeID:     nodeID,
×
1474
                        FeatureBit: int32(feature),
×
1475
                })
×
1476
                if err != nil {
×
1477
                        return 0, fmt.Errorf("unable to insert node(%d) "+
×
1478
                                "feature(%v): %w", nodeID, feature, err)
×
1479
                }
×
1480
        }
1481

1482
        // Update the node's addresses.
1483
        newAddresses, err := collectAddressRecords(node.Addresses)
×
1484
        if err != nil {
×
1485
                return 0, err
×
1486
        }
×
1487

1488
        // Any remaining entries in newAddresses are new addresses that need to
1489
        // be added to the database for the first time.
1490
        for addrType, addrList := range newAddresses {
×
1491
                for position, addr := range addrList {
×
1492
                        err := db.UpsertNodeAddress(
×
1493
                                ctx, sqlc.UpsertNodeAddressParams{
×
1494
                                        NodeID:   nodeID,
×
1495
                                        Type:     int16(addrType),
×
1496
                                        Address:  addr,
×
1497
                                        Position: int32(position),
×
1498
                                },
×
1499
                        )
×
1500
                        if err != nil {
×
1501
                                return 0, fmt.Errorf("unable to insert "+
×
1502
                                        "node(%d) address(%v): %w", nodeID,
×
1503
                                        addr, err)
×
1504
                        }
×
1505
                }
1506
        }
1507

1508
        // Convert the flat extra opaque data into a map of TLV types to
1509
        // values.
1510
        extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
1511
        if err != nil {
×
1512
                return 0, fmt.Errorf("unable to marshal extra opaque data: %w",
×
1513
                        err)
×
1514
        }
×
1515

1516
        // Insert the node's extra signed fields.
1517
        for tlvType, value := range extra {
×
1518
                err = db.UpsertNodeExtraType(
×
1519
                        ctx, sqlc.UpsertNodeExtraTypeParams{
×
1520
                                NodeID: nodeID,
×
1521
                                Type:   int64(tlvType),
×
1522
                                Value:  value,
×
1523
                        },
×
1524
                )
×
1525
                if err != nil {
×
1526
                        return 0, fmt.Errorf("unable to upsert node(%d) extra "+
×
1527
                                "signed field(%v): %w", nodeID, tlvType, err)
×
1528
                }
×
1529
        }
1530

1531
        return nodeID, nil
×
1532
}
1533

1534
// dbChanInfo holds the DB level IDs of a channel and the nodes involved in the
1535
// channel.
1536
type dbChanInfo struct {
1537
        channelID int64
1538
        node1ID   int64
1539
        node2ID   int64
1540
}
1541

1542
// insertChannelMig inserts a new channel record into the database during the
1543
// graph SQL migration.
1544
func insertChannelMig(ctx context.Context, db SQLQueries,
1545
        edge *models.ChannelEdgeInfo) (*dbChanInfo, error) {
×
1546

×
1547
        // Make sure that at least a "shell" entry for each node is present in
×
1548
        // the nodes table.
×
1549
        //
×
1550
        // NOTE: we need this even during the SQL migration where nodes are
×
1551
        // migrated first because there are cases were some nodes may have
×
1552
        // been skipped due to invalid TLV data.
×
1553
        node1DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey1Bytes)
×
1554
        if err != nil {
×
1555
                return nil, fmt.Errorf("unable to create shell node: %w", err)
×
1556
        }
×
1557

1558
        node2DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey2Bytes)
×
1559
        if err != nil {
×
1560
                return nil, fmt.Errorf("unable to create shell node: %w", err)
×
1561
        }
×
1562

1563
        var capacity sql.NullInt64
×
1564
        if edge.Capacity != 0 {
×
1565
                capacity = sqldb.SQLInt64(int64(edge.Capacity))
×
1566
        }
×
1567

1568
        createParams := sqlc.InsertChannelMigParams{
×
NEW
1569
                Version:     int16(lnwire.GossipVersion1),
×
1570
                Scid:        channelIDToBytes(edge.ChannelID),
×
1571
                NodeID1:     node1DBID,
×
1572
                NodeID2:     node2DBID,
×
1573
                Outpoint:    edge.ChannelPoint.String(),
×
1574
                Capacity:    capacity,
×
1575
                BitcoinKey1: edge.BitcoinKey1Bytes[:],
×
1576
                BitcoinKey2: edge.BitcoinKey2Bytes[:],
×
1577
        }
×
1578

×
1579
        if edge.AuthProof != nil {
×
1580
                proof := edge.AuthProof
×
1581

×
1582
                createParams.Node1Signature = proof.NodeSig1Bytes
×
1583
                createParams.Node2Signature = proof.NodeSig2Bytes
×
1584
                createParams.Bitcoin1Signature = proof.BitcoinSig1Bytes
×
1585
                createParams.Bitcoin2Signature = proof.BitcoinSig2Bytes
×
1586
        }
×
1587

1588
        // Insert the new channel record.
1589
        dbChanID, err := db.InsertChannelMig(ctx, createParams)
×
1590
        if err != nil {
×
1591
                return nil, err
×
1592
        }
×
1593

1594
        // Insert any channel features.
1595
        for feature := range edge.Features.Features() {
×
1596
                err = db.InsertChannelFeature(
×
1597
                        ctx, sqlc.InsertChannelFeatureParams{
×
1598
                                ChannelID:  dbChanID,
×
1599
                                FeatureBit: int32(feature),
×
1600
                        },
×
1601
                )
×
1602
                if err != nil {
×
1603
                        return nil, fmt.Errorf("unable to insert channel(%d) "+
×
1604
                                "feature(%v): %w", dbChanID, feature, err)
×
1605
                }
×
1606
        }
1607

1608
        // Finally, insert any extra TLV fields in the channel announcement.
1609
        extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
×
1610
        if err != nil {
×
1611
                return nil, fmt.Errorf("unable to marshal extra opaque "+
×
1612
                        "data: %w", err)
×
1613
        }
×
1614

1615
        for tlvType, value := range extra {
×
1616
                err := db.UpsertChannelExtraType(
×
1617
                        ctx, sqlc.UpsertChannelExtraTypeParams{
×
1618
                                ChannelID: dbChanID,
×
1619
                                Type:      int64(tlvType),
×
1620
                                Value:     value,
×
1621
                        },
×
1622
                )
×
1623
                if err != nil {
×
1624
                        return nil, fmt.Errorf("unable to upsert "+
×
1625
                                "channel(%d) extra signed field(%v): %w",
×
1626
                                edge.ChannelID, tlvType, err)
×
1627
                }
×
1628
        }
1629

1630
        return &dbChanInfo{
×
1631
                channelID: dbChanID,
×
1632
                node1ID:   node1DBID,
×
1633
                node2ID:   node2DBID,
×
1634
        }, nil
×
1635
}
1636

1637
// insertChanEdgePolicyMig inserts the channel policy info we have stored for
1638
// a channel we already know of. This is used during the SQL migration
1639
// process to insert channel policies.
1640
func insertChanEdgePolicyMig(ctx context.Context, tx SQLQueries,
1641
        dbChan *dbChanInfo, edge *models.ChannelEdgePolicy) error {
×
1642

×
1643
        // Figure out which node this edge is from.
×
1644
        isNode1 := edge.ChannelFlags&lnwire.ChanUpdateDirection == 0
×
1645
        nodeID := dbChan.node1ID
×
1646
        if !isNode1 {
×
1647
                nodeID = dbChan.node2ID
×
1648
        }
×
1649

1650
        var (
×
1651
                inboundBase sql.NullInt64
×
1652
                inboundRate sql.NullInt64
×
1653
        )
×
1654
        edge.InboundFee.WhenSome(func(fee lnwire.Fee) {
×
1655
                inboundRate = sqldb.SQLInt64(fee.FeeRate)
×
1656
                inboundBase = sqldb.SQLInt64(fee.BaseFee)
×
1657
        })
×
1658

1659
        id, err := tx.InsertEdgePolicyMig(ctx, sqlc.InsertEdgePolicyMigParams{
×
NEW
1660
                Version:     int16(lnwire.GossipVersion1),
×
1661
                ChannelID:   dbChan.channelID,
×
1662
                NodeID:      nodeID,
×
1663
                Timelock:    int32(edge.TimeLockDelta),
×
1664
                FeePpm:      int64(edge.FeeProportionalMillionths),
×
1665
                BaseFeeMsat: int64(edge.FeeBaseMSat),
×
1666
                MinHtlcMsat: int64(edge.MinHTLC),
×
1667
                LastUpdate:  sqldb.SQLInt64(edge.LastUpdate.Unix()),
×
1668
                Disabled: sql.NullBool{
×
1669
                        Valid: true,
×
1670
                        Bool:  edge.IsDisabled(),
×
1671
                },
×
1672
                MaxHtlcMsat: sql.NullInt64{
×
1673
                        Valid: edge.MessageFlags.HasMaxHtlc(),
×
1674
                        Int64: int64(edge.MaxHTLC),
×
1675
                },
×
1676
                MessageFlags:            sqldb.SQLInt16(edge.MessageFlags),
×
1677
                ChannelFlags:            sqldb.SQLInt16(edge.ChannelFlags),
×
1678
                InboundBaseFeeMsat:      inboundBase,
×
1679
                InboundFeeRateMilliMsat: inboundRate,
×
1680
                Signature:               edge.SigBytes,
×
1681
        })
×
1682
        if err != nil {
×
1683
                return fmt.Errorf("unable to upsert edge policy: %w", err)
×
1684
        }
×
1685

1686
        // Convert the flat extra opaque data into a map of TLV types to
1687
        // values.
1688
        extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
×
1689
        if err != nil {
×
1690
                return fmt.Errorf("unable to marshal extra opaque data: %w",
×
1691
                        err)
×
1692
        }
×
1693

1694
        // Insert all new extra signed fields for the channel policy.
1695
        for tlvType, value := range extra {
×
1696
                err = tx.UpsertChanPolicyExtraType(
×
1697
                        ctx, sqlc.UpsertChanPolicyExtraTypeParams{
×
1698
                                ChannelPolicyID: id,
×
1699
                                Type:            int64(tlvType),
×
1700
                                Value:           value,
×
1701
                        },
×
1702
                )
×
1703
                if err != nil {
×
1704
                        return fmt.Errorf("unable to insert "+
×
1705
                                "channel_policy(%d) extra signed field(%v): %w",
×
1706
                                id, tlvType, err)
×
1707
                }
×
1708
        }
1709

1710
        return nil
×
1711
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc