• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16989312393

15 Aug 2025 11:45AM UTC coverage: 66.728%. First build
16989312393

Pull #10162

github

web-flow
Merge a69762f3b into 6b38f8d9d
Pull Request #10162: [Draft] graph/db: unwrap dns addresses from opaque ones during migration

147 of 180 new or added lines in 2 files covered. (81.67%)

135940 of 203724 relevant lines covered (66.73%)

21517.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "cmp"
6
        "context"
7
        "database/sql"
8
        "errors"
9
        "fmt"
10
        "net"
11
        "slices"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/routing/route"
19
        "github.com/lightningnetwork/lnd/sqldb"
20
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
21
        "golang.org/x/time/rate"
22
)
23

24
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
25
// backend.
26
//
27
// NOTE: this is currently not called from any code path. It is called via tests
28
// only for now and will be called from the main lnd binary once the
29
// migration is fully implemented and tested.
30
func MigrateGraphToSQL(ctx context.Context, cfg *SQLStoreConfig,
31
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
32

×
33
        log.Infof("Starting migration of the graph store from KV to SQL")
×
34
        t0 := time.Now()
×
35

×
36
        // Check if there is a graph to migrate.
×
37
        graphExists, err := checkGraphExists(kvBackend)
×
38
        if err != nil {
×
39
                return fmt.Errorf("failed to check graph existence: %w", err)
×
40
        }
×
41
        if !graphExists {
×
42
                log.Infof("No graph found in KV store, skipping the migration")
×
43
                return nil
×
44
        }
×
45

46
        // 1) Migrate all the nodes.
47
        err = migrateNodes(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
48
        if err != nil {
×
49
                return fmt.Errorf("could not migrate nodes: %w", err)
×
50
        }
×
51

52
        // 2) Migrate the source node.
53
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
54
                return fmt.Errorf("could not migrate source node: %w", err)
×
55
        }
×
56

57
        // 3) Migrate all the channels and channel policies.
58
        err = migrateChannelsAndPolicies(ctx, cfg, kvBackend, sqlDB)
×
59
        if err != nil {
×
60
                return fmt.Errorf("could not migrate channels and policies: %w",
×
61
                        err)
×
62
        }
×
63

64
        // 4) Migrate the Prune log.
65
        err = migratePruneLog(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
66
        if err != nil {
×
67
                return fmt.Errorf("could not migrate prune log: %w", err)
×
68
        }
×
69

70
        // 5) Migrate the closed SCID index.
71
        err = migrateClosedSCIDIndex(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
72
        if err != nil {
×
73
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
74
                        err)
×
75
        }
×
76

77
        // 6) Migrate the zombie index.
78
        err = migrateZombieIndex(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
79
        if err != nil {
×
80
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
81
        }
×
82

83
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
84
                time.Since(t0))
×
85

×
86
        return nil
×
87
}
88

89
// checkGraphExists checks if the graph exists in the KV backend.
90
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
91
        // Check if there is even a graph to migrate.
×
92
        err := db.View(func(tx kvdb.RTx) error {
×
93
                // Check for the existence of the node bucket which is a top
×
94
                // level bucket that would have been created on the initial
×
95
                // creation of the graph store.
×
96
                nodes := tx.ReadBucket(nodeBucket)
×
97
                if nodes == nil {
×
98
                        return ErrGraphNotFound
×
99
                }
×
100

101
                return nil
×
102
        }, func() {})
×
103
        if errors.Is(err, ErrGraphNotFound) {
×
104
                return false, nil
×
105
        } else if err != nil {
×
106
                return false, err
×
107
        }
×
108

109
        return true, nil
×
110
}
111

112
// migrateNodes migrates all nodes from the KV backend to the SQL database.
113
// It collects nodes in batches, inserts them individually, and then validates
114
// them in batches.
115
func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig,
116
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
117

×
118
        // Keep track of the number of nodes migrated and the number of
×
119
        // nodes skipped due to errors.
×
120
        var (
×
121
                totalTime = time.Now()
×
122

×
123
                count   uint64
×
124
                skipped uint64
×
125

×
126
                t0    = time.Now()
×
127
                chunk uint64
×
128
                s     = rate.Sometimes{
×
129
                        Interval: 10 * time.Second,
×
130
                }
×
131
        )
×
132

×
133
        // batch is a map that holds node objects that have been migrated to
×
134
        // the native SQL store that have yet to be validated. The object's held
×
135
        // by this map were derived from the KVDB store and so when they are
×
136
        // validated, the map index (the SQL store node ID) will be used to
×
137
        // fetch the corresponding node object in the SQL store, and it will
×
138
        // then be compared against the original KVDB node object.
×
139
        batch := make(
×
140
                map[int64]*models.LightningNode, cfg.MaxBatchSize,
×
141
        )
×
142

×
143
        // validateBatch validates that the batch of nodes in the 'batch' map
×
144
        // have been migrated successfully.
×
145
        validateBatch := func() error {
×
146
                if len(batch) == 0 {
×
147
                        return nil
×
148
                }
×
149

150
                // Extract DB node IDs.
151
                dbIDs := make([]int64, 0, len(batch))
×
152
                for dbID := range batch {
×
153
                        dbIDs = append(dbIDs, dbID)
×
154
                }
×
155

156
                // Batch fetch all nodes from the database.
157
                dbNodes, err := sqlDB.GetNodesByIDs(ctx, dbIDs)
×
158
                if err != nil {
×
159
                        return fmt.Errorf("could not batch fetch nodes: %w",
×
160
                                err)
×
161
                }
×
162

163
                // Make sure that the number of nodes fetched matches the number
164
                // of nodes in the batch.
165
                if len(dbNodes) != len(batch) {
×
166
                        return fmt.Errorf("expected to fetch %d nodes, "+
×
167
                                "but got %d", len(batch), len(dbNodes))
×
168
                }
×
169

170
                // Now, batch fetch the normalised data for all the nodes in
171
                // the batch.
172
                batchData, err := batchLoadNodeData(ctx, cfg, sqlDB, dbIDs)
×
173
                if err != nil {
×
174
                        return fmt.Errorf("unable to batch load node data: %w",
×
175
                                err)
×
176
                }
×
177

178
                for _, dbNode := range dbNodes {
×
179
                        // Get the KVDB node info from the batch map.
×
180
                        node, ok := batch[dbNode.ID]
×
181
                        if !ok {
×
182
                                return fmt.Errorf("node with ID %d not found "+
×
183
                                        "in batch", dbNode.ID)
×
184
                        }
×
185

186
                        // Build the migrated node from the DB node and the
187
                        // batch node data.
188
                        migNode, err := buildNodeWithBatchData(
×
189
                                dbNode, batchData,
×
190
                        )
×
191
                        if err != nil {
×
192
                                return fmt.Errorf("could not build migrated "+
×
193
                                        "node from dbNode(db id: %d, node "+
×
194
                                        "pub: %x): %w", dbNode.ID,
×
195
                                        node.PubKeyBytes, err)
×
196
                        }
×
197

198
                        // Make sure that the node addresses are sorted before
199
                        // comparing them to ensure that the order of addresses
200
                        // does not affect the comparison.
201
                        slices.SortFunc(
×
202
                                node.Addresses, func(i, j net.Addr) int {
×
203
                                        return cmp.Compare(
×
204
                                                i.String(), j.String(),
×
205
                                        )
×
206
                                },
×
207
                        )
208
                        slices.SortFunc(
×
209
                                migNode.Addresses, func(i, j net.Addr) int {
×
210
                                        return cmp.Compare(
×
211
                                                i.String(), j.String(),
×
212
                                        )
×
213
                                },
×
214
                        )
215

216
                        err = sqldb.CompareRecords(
×
217
                                node, migNode,
×
218
                                fmt.Sprintf("node %x", node.PubKeyBytes),
×
219
                        )
×
220
                        if err != nil {
×
221
                                return fmt.Errorf("node mismatch after "+
×
222
                                        "migration for node %x: %w",
×
223
                                        node.PubKeyBytes, err)
×
224
                        }
×
225
                }
226

227
                // Clear the batch map for the next iteration.
228
                batch = make(
×
229
                        map[int64]*models.LightningNode, cfg.MaxBatchSize,
×
230
                )
×
231

×
232
                return nil
×
233
        }
234

235
        // Loop through each node in the KV store and insert it into the SQL
236
        // database.
237
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
238
                node *models.LightningNode) error {
×
239

×
240
                pub := node.PubKeyBytes
×
241

×
242
                // Sanity check to ensure that the node has valid extra opaque
×
243
                // data. If it does not, we'll skip it. We need to do this
×
244
                // because previously we would just persist any TLV bytes that
×
245
                // we received without validating them. Now, however, we
×
246
                // normalise the storage of extra opaque data, so we need to
×
247
                // ensure that the data is valid. We don't want to abort the
×
248
                // migration if we encounter a node with invalid extra opaque
×
249
                // data, so we'll just skip it and log a warning.
×
250
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
251
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
252
                        skipped++
×
253
                        log.Warnf("Skipping migration of node %x with invalid "+
×
254
                                "extra opaque data: %v", pub,
×
255
                                node.ExtraOpaqueData)
×
256

×
257
                        return nil
×
258
                } else if err != nil {
×
259
                        return fmt.Errorf("unable to marshal extra "+
×
260
                                "opaque data for node %x: %w", pub, err)
×
261
                }
×
262

263
                count++
×
264
                chunk++
×
265

×
NEW
266
                addrs := make([]net.Addr, 0, len(node.Addresses))
×
NEW
267
                for _, addr := range node.Addresses {
×
NEW
268
                        opaque, ok := addr.(*lnwire.OpaqueAddrs)
×
NEW
269
                        if !ok {
×
NEW
270
                                addrs = append(addrs, addr)
×
NEW
271
                                continue
×
272
                        }
273

NEW
274
                        r := bytes.NewReader(opaque.Payload)
×
NEW
275
                        numAddrBytes := uint16(len(opaque.Payload))
×
NEW
276
                        byteRead, readAddr, err := lnwire.ReadAddress(
×
NEW
277
                                r, numAddrBytes,
×
NEW
278
                        )
×
NEW
279
                        if err != nil {
×
NEW
280
                                // We then just keep the address as opaque.
×
NEW
281
                                addrs = append(addrs, addr)
×
NEW
282

×
NEW
283
                                continue
×
284
                        }
285

NEW
286
                        if readAddr != nil {
×
NEW
287
                                addrs = append(addrs, readAddr)
×
NEW
288
                        }
×
289

NEW
290
                        if byteRead == numAddrBytes {
×
NEW
291
                                continue
×
292
                        }
293

NEW
294
                        addrs = append(addrs, &lnwire.OpaqueAddrs{
×
NEW
295
                                Payload: opaque.Payload[byteRead:],
×
NEW
296
                        })
×
297
                }
NEW
298
                if len(addrs) != 0 {
×
NEW
299
                        node.Addresses = addrs
×
NEW
300
                }
×
301

302
                // Write the node to the SQL database.
303
                id, err := upsertNode(ctx, sqlDB, node)
×
304
                if err != nil {
×
305
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
306
                                err)
×
307
                }
×
308

309
                // Add to validation batch.
310
                batch[id] = node
×
311

×
312
                // Validate batch when full.
×
313
                if len(batch) >= int(cfg.MaxBatchSize) {
×
314
                        err := validateBatch()
×
315
                        if err != nil {
×
316
                                return fmt.Errorf("batch validation failed: %w",
×
317
                                        err)
×
318
                        }
×
319
                }
320

321
                s.Do(func() {
×
322
                        elapsed := time.Since(t0).Seconds()
×
323
                        ratePerSec := float64(chunk) / elapsed
×
324
                        log.Debugf("Migrated %d nodes (%.2f nodes/sec)",
×
325
                                count, ratePerSec)
×
326

×
327
                        t0 = time.Now()
×
328
                        chunk = 0
×
329
                })
×
330

331
                return nil
×
332
        }, func() {
×
333
                // No reset is needed since if a retry occurs, the entire
×
334
                // migration will be retried from the start.
×
335
        })
×
336
        if err != nil {
×
337
                return fmt.Errorf("could not migrate nodes: %w", err)
×
338
        }
×
339

340
        // Validate any remaining nodes in the batch.
341
        if len(batch) > 0 {
×
342
                err := validateBatch()
×
343
                if err != nil {
×
344
                        return fmt.Errorf("final batch validation failed: %w",
×
345
                                err)
×
346
                }
×
347
        }
348

349
        log.Infof("Migrated %d nodes from KV to SQL in %v (skipped %d nodes "+
×
350
                "due to invalid TLV streams)", count, time.Since(totalTime),
×
351
                skipped)
×
352

×
353
        return nil
×
354
}
355

356
// migrateSourceNode migrates the source node from the KV backend to the
357
// SQL database.
358
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
359
        sqlDB SQLQueries) error {
×
360

×
361
        log.Debugf("Migrating source node from KV to SQL")
×
362

×
363
        sourceNode, err := sourceNode(kvdb)
×
364
        if errors.Is(err, ErrSourceNodeNotSet) {
×
365
                // If the source node has not been set yet, we can skip this
×
366
                // migration step.
×
367
                return nil
×
368
        } else if err != nil {
×
369
                return fmt.Errorf("could not get source node from kv "+
×
370
                        "store: %w", err)
×
371
        }
×
372

373
        pub := sourceNode.PubKeyBytes
×
374

×
375
        // Get the DB ID of the source node by its public key. This node must
×
376
        // already exist in the SQL database, as it should have been migrated
×
377
        // in the previous node-migration step.
×
378
        id, err := sqlDB.GetNodeIDByPubKey(
×
379
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
380
                        PubKey:  pub[:],
×
381
                        Version: int16(ProtocolV1),
×
382
                },
×
383
        )
×
384
        if err != nil {
×
385
                return fmt.Errorf("could not get source node ID: %w", err)
×
386
        }
×
387

388
        // Now we can add the source node to the SQL database.
389
        err = sqlDB.AddSourceNode(ctx, id)
×
390
        if err != nil {
×
391
                return fmt.Errorf("could not add source node to SQL store: %w",
×
392
                        err)
×
393
        }
×
394

395
        // Verify that the source node was added correctly by fetching it back
396
        // from the SQL database and checking that the expected DB ID and
397
        // pub key are returned. We don't need to do a whole node comparison
398
        // here, as this was already done in the previous migration step.
399
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
400
        if err != nil {
×
401
                return fmt.Errorf("could not get source nodes from SQL "+
×
402
                        "store: %w", err)
×
403
        }
×
404

405
        // The SQL store has support for multiple source nodes (for future
406
        // protocol versions) but this migration is purely aimed at the V1
407
        // store, and so we expect exactly one source node to be present.
408
        if len(srcNodes) != 1 {
×
409
                return fmt.Errorf("expected exactly one source node, "+
×
410
                        "got %d", len(srcNodes))
×
411
        }
×
412

413
        // Check that the source node ID and pub key match the original
414
        // source node.
415
        if srcNodes[0].NodeID != id {
×
416
                return fmt.Errorf("source node ID mismatch after migration: "+
×
417
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
418
        }
×
419
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
420
        if err != nil {
×
421
                return fmt.Errorf("source node pubkey mismatch after "+
×
422
                        "migration: %w", err)
×
423
        }
×
424

425
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
426

×
427
        return nil
×
428
}
429

430
// migChanInfo holds the information about a channel and its policies.
431
type migChanInfo struct {
432
        // edge is the channel object as read from the KVDB source.
433
        edge *models.ChannelEdgeInfo
434

435
        // policy1 is the first channel policy for the channel as read from
436
        // the KVDB source.
437
        policy1 *models.ChannelEdgePolicy
438

439
        // policy2 is the second channel policy for the channel as read
440
        // from the KVDB source.
441
        policy2 *models.ChannelEdgePolicy
442

443
        // dbInfo holds location info (in the form of DB IDs) of the channel
444
        // and its policies in the native-SQL destination.
445
        dbInfo *dbChanInfo
446
}
447

448
// migrateChannelsAndPolicies migrates all channels and their policies
449
// from the KV backend to the SQL database.
450
func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
451
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
452

×
453
        var (
×
454
                totalTime = time.Now()
×
455

×
456
                channelCount       uint64
×
457
                skippedChanCount   uint64
×
458
                policyCount        uint64
×
459
                skippedPolicyCount uint64
×
460

×
461
                t0    = time.Now()
×
462
                chunk uint64
×
463
                s     = rate.Sometimes{
×
464
                        Interval: 10 * time.Second,
×
465
                }
×
466
        )
×
467
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
468
                // If the policy is nil, we can skip it.
×
469
                if policy == nil {
×
470
                        return nil
×
471
                }
×
472

473
                // Unlike the special case of invalid TLV bytes for node and
474
                // channel announcements, we don't need to handle the case for
475
                // channel policies here because it is already handled in the
476
                // `forEachChannel` function. If the policy has invalid TLV
477
                // bytes, then `nil` will be passed to this function.
478

479
                policyCount++
×
480

×
481
                _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
×
482
                if err != nil {
×
483
                        return fmt.Errorf("could not migrate channel "+
×
484
                                "policy %d: %w", policy.ChannelID, err)
×
485
                }
×
486

487
                return nil
×
488
        }
489

490
        // batch is used to collect migrated channel info that we will
491
        // batch-validate. Each entry is indexed by the DB ID of the channel
492
        // in the SQL database.
493
        batch := make(map[int64]*migChanInfo, cfg.QueryCfg.MaxBatchSize)
×
494

×
495
        // Iterate over each channel in the KV store and migrate it and its
×
496
        // policies to the SQL database.
×
497
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
498
                policy1 *models.ChannelEdgePolicy,
×
499
                policy2 *models.ChannelEdgePolicy) error {
×
500

×
501
                scid := channel.ChannelID
×
502

×
503
                // Here, we do a sanity check to ensure that the chain hash of
×
504
                // the channel returned by the KV store matches the expected
×
505
                // chain hash. This is important since in the SQL store, we will
×
506
                // no longer explicitly store the chain hash in the channel
×
507
                // info, but rather rely on the chain hash LND is running with.
×
508
                // So this is our way of ensuring that LND is running on the
×
509
                // correct network at migration time.
×
510
                if channel.ChainHash != cfg.ChainHash {
×
511
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
512
                                "expected %s", scid, channel.ChainHash,
×
513
                                cfg.ChainHash)
×
514
                }
×
515

516
                // Sanity check to ensure that the channel has valid extra
517
                // opaque data. If it does not, we'll skip it. We need to do
518
                // this because previously we would just persist any TLV bytes
519
                // that we received without validating them. Now, however, we
520
                // normalise the storage of extra opaque data, so we need to
521
                // ensure that the data is valid. We don't want to abort the
522
                // migration if we encounter a channel with invalid extra opaque
523
                // data, so we'll just skip it and log a warning.
524
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
525
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
526
                        log.Warnf("Skipping channel %d with invalid "+
×
527
                                "extra opaque data: %v", scid,
×
528
                                channel.ExtraOpaqueData)
×
529

×
530
                        skippedChanCount++
×
531

×
532
                        // If we skip a channel, we also skip its policies.
×
533
                        if policy1 != nil {
×
534
                                skippedPolicyCount++
×
535
                        }
×
536
                        if policy2 != nil {
×
537
                                skippedPolicyCount++
×
538
                        }
×
539

540
                        return nil
×
541
                } else if err != nil {
×
542
                        return fmt.Errorf("unable to marshal extra opaque "+
×
543
                                "data for channel %d (%v): %w", scid,
×
544
                                channel.ExtraOpaqueData, err)
×
545
                }
×
546

547
                channelCount++
×
548
                chunk++
×
549

×
550
                // Migrate the channel info along with its policies.
×
551
                dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
552
                if err != nil {
×
553
                        return fmt.Errorf("could not insert record for "+
×
554
                                "channel %d in SQL store: %w", scid, err)
×
555
                }
×
556

557
                // Now, migrate the two channel policies for the channel.
558
                err = migChanPolicy(policy1)
×
559
                if err != nil {
×
560
                        return fmt.Errorf("could not migrate policy1(%d): %w",
×
561
                                scid, err)
×
562
                }
×
563
                err = migChanPolicy(policy2)
×
564
                if err != nil {
×
565
                        return fmt.Errorf("could not migrate policy2(%d): %w",
×
566
                                scid, err)
×
567
                }
×
568

569
                // Collect the migrated channel info and policies in a batch for
570
                // later validation.
571
                batch[dbChanInfo.channelID] = &migChanInfo{
×
572
                        edge:    channel,
×
573
                        policy1: policy1,
×
574
                        policy2: policy2,
×
575
                        dbInfo:  dbChanInfo,
×
576
                }
×
577

×
578
                if len(batch) >= int(cfg.QueryCfg.MaxBatchSize) {
×
579
                        // Do batch validation.
×
580
                        err := validateMigratedChannels(ctx, cfg, sqlDB, batch)
×
581
                        if err != nil {
×
582
                                return fmt.Errorf("could not validate "+
×
583
                                        "channel batch: %w", err)
×
584
                        }
×
585

586
                        batch = make(
×
587
                                map[int64]*migChanInfo,
×
588
                                cfg.QueryCfg.MaxBatchSize,
×
589
                        )
×
590
                }
591

592
                s.Do(func() {
×
593
                        elapsed := time.Since(t0).Seconds()
×
594
                        ratePerSec := float64(chunk) / elapsed
×
595
                        log.Debugf("Migrated %d channels (%.2f channels/sec)",
×
596
                                channelCount, ratePerSec)
×
597

×
598
                        t0 = time.Now()
×
599
                        chunk = 0
×
600
                })
×
601

602
                return nil
×
603
        }, func() {
×
604
                // No reset is needed since if a retry occurs, the entire
×
605
                // migration will be retried from the start.
×
606
        })
×
607
        if err != nil {
×
608
                return fmt.Errorf("could not migrate channels and policies: %w",
×
609
                        err)
×
610
        }
×
611

612
        if len(batch) > 0 {
×
613
                // Do a final batch validation for any remaining channels.
×
614
                err := validateMigratedChannels(ctx, cfg, sqlDB, batch)
×
615
                if err != nil {
×
616
                        return fmt.Errorf("could not validate final channel "+
×
617
                                "batch: %w", err)
×
618
                }
×
619

620
                batch = make(map[int64]*migChanInfo, cfg.QueryCfg.MaxBatchSize)
×
621
        }
622

623
        log.Infof("Migrated %d channels and %d policies from KV to SQL in %s"+
×
624
                "(skipped %d channels and %d policies due to invalid TLV "+
×
625
                "streams)", channelCount, policyCount, time.Since(totalTime),
×
626
                skippedChanCount, skippedPolicyCount)
×
627

×
628
        return nil
×
629
}
630

631
// validateMigratedChannels validates the channels in the batch after they have
632
// been migrated to the SQL database. It batch fetches all channels by their IDs
633
// and compares the migrated channels and their policies with the original ones
634
// to ensure they match using batch construction patterns.
635
func validateMigratedChannels(ctx context.Context, cfg *SQLStoreConfig,
636
        sqlDB SQLQueries, batch map[int64]*migChanInfo) error {
×
637

×
638
        // Convert batch keys (DB IDs) to an int slice for the batch query.
×
639
        dbChanIDs := make([]int64, 0, len(batch))
×
640
        for id := range batch {
×
641
                dbChanIDs = append(dbChanIDs, id)
×
642
        }
×
643

644
        // Batch fetch all channels with their policies.
645
        rows, err := sqlDB.GetChannelsByIDs(ctx, dbChanIDs)
×
646
        if err != nil {
×
647
                return fmt.Errorf("could not batch get channels by IDs: %w",
×
648
                        err)
×
649
        }
×
650

651
        // Sanity check that the same number of channels were returned
652
        // as requested.
653
        if len(rows) != len(dbChanIDs) {
×
654
                return fmt.Errorf("expected to fetch %d channels, "+
×
655
                        "but got %d", len(dbChanIDs), len(rows))
×
656
        }
×
657

658
        // Collect all policy IDs needed for batch data loading.
659
        dbPolicyIDs := make([]int64, 0, len(dbChanIDs)*2)
×
660

×
661
        for _, row := range rows {
×
662
                scid := byteOrder.Uint64(row.GraphChannel.Scid)
×
663

×
664
                dbPol1, dbPol2, err := extractChannelPolicies(row)
×
665
                if err != nil {
×
666
                        return fmt.Errorf("could not extract channel policies"+
×
667
                                " for SCID %d: %w", scid, err)
×
668
                }
×
669
                if dbPol1 != nil {
×
670
                        dbPolicyIDs = append(dbPolicyIDs, dbPol1.ID)
×
671
                }
×
672
                if dbPol2 != nil {
×
673
                        dbPolicyIDs = append(dbPolicyIDs, dbPol2.ID)
×
674
                }
×
675
        }
676

677
        // Batch load all channel and policy data (features, extras).
678
        batchData, err := batchLoadChannelData(
×
679
                ctx, cfg.QueryCfg, sqlDB, dbChanIDs, dbPolicyIDs,
×
680
        )
×
681
        if err != nil {
×
682
                return fmt.Errorf("could not batch load channel and policy "+
×
683
                        "data: %w", err)
×
684
        }
×
685

686
        // Validate each channel in the batch using pre-loaded data.
687
        for _, row := range rows {
×
688
                kvdbChan, ok := batch[row.GraphChannel.ID]
×
689
                if !ok {
×
690
                        return fmt.Errorf("channel with ID %d not found "+
×
691
                                "in batch", row.GraphChannel.ID)
×
692
                }
×
693

694
                scid := byteOrder.Uint64(row.GraphChannel.Scid)
×
695

×
696
                err = validateMigratedChannelWithBatchData(
×
697
                        cfg, scid, kvdbChan, row, batchData,
×
698
                )
×
699
                if err != nil {
×
700
                        return fmt.Errorf("channel %d validation failed "+
×
701
                                "after migration: %w", scid, err)
×
702
                }
×
703
        }
704

705
        return nil
×
706
}
707

708
// validateMigratedChannelWithBatchData validates a single migrated channel
709
// using pre-fetched batch data for optimal performance.
710
func validateMigratedChannelWithBatchData(cfg *SQLStoreConfig,
711
        scid uint64, info *migChanInfo, row sqlc.GetChannelsByIDsRow,
712
        batchData *batchChannelData) error {
×
713

×
714
        dbChanInfo := info.dbInfo
×
715
        channel := info.edge
×
716

×
717
        // Assert that the DB IDs for the channel and nodes are as expected
×
718
        // given the inserted channel info.
×
719
        err := sqldb.CompareRecords(
×
720
                dbChanInfo.channelID, row.GraphChannel.ID, "channel DB ID",
×
721
        )
×
722
        if err != nil {
×
723
                return err
×
724
        }
×
725
        err = sqldb.CompareRecords(
×
726
                dbChanInfo.node1ID, row.Node1ID, "node1 DB ID",
×
727
        )
×
728
        if err != nil {
×
729
                return err
×
730
        }
×
731
        err = sqldb.CompareRecords(
×
732
                dbChanInfo.node2ID, row.Node2ID, "node2 DB ID",
×
733
        )
×
734
        if err != nil {
×
735
                return err
×
736
        }
×
737

738
        // Build node vertices from the row data.
739
        node1, node2, err := buildNodeVertices(
×
740
                row.Node1PubKey, row.Node2PubKey,
×
741
        )
×
742
        if err != nil {
×
743
                return err
×
744
        }
×
745

746
        // Build channel info using batch data.
747
        migChan, err := buildEdgeInfoWithBatchData(
×
748
                cfg.ChainHash, row.GraphChannel, node1, node2, batchData,
×
749
        )
×
750
        if err != nil {
×
751
                return fmt.Errorf("could not build migrated channel info: %w",
×
752
                        err)
×
753
        }
×
754

755
        // Extract channel policies from the row.
756
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
757
        if err != nil {
×
758
                return fmt.Errorf("could not extract channel policies: %w", err)
×
759
        }
×
760

761
        // Build channel policies using batch data.
762
        migPol1, migPol2, err := buildChanPoliciesWithBatchData(
×
763
                dbPol1, dbPol2, scid, node1, node2, batchData,
×
764
        )
×
765
        if err != nil {
×
766
                return fmt.Errorf("could not build migrated channel "+
×
767
                        "policies: %w", err)
×
768
        }
×
769

770
        // Finally, compare the original channel info and
771
        // policies with the migrated ones to ensure they match.
772
        if len(channel.ExtraOpaqueData) == 0 {
×
773
                channel.ExtraOpaqueData = nil
×
774
        }
×
775
        if len(migChan.ExtraOpaqueData) == 0 {
×
776
                migChan.ExtraOpaqueData = nil
×
777
        }
×
778

779
        err = sqldb.CompareRecords(
×
780
                channel, migChan, fmt.Sprintf("channel %d", scid),
×
781
        )
×
782
        if err != nil {
×
783
                return err
×
784
        }
×
785

786
        checkPolicy := func(expPolicy,
×
787
                migPolicy *models.ChannelEdgePolicy) error {
×
788

×
789
                switch {
×
790
                // Both policies are nil, nothing to compare.
791
                case expPolicy == nil && migPolicy == nil:
×
792
                        return nil
×
793

794
                // One of the policies is nil, but the other is not.
795
                case expPolicy == nil || migPolicy == nil:
×
796
                        return fmt.Errorf("expected both policies to be "+
×
797
                                "non-nil. Got expPolicy: %v, "+
×
798
                                "migPolicy: %v", expPolicy, migPolicy)
×
799

800
                // Both policies are non-nil, we can compare them.
801
                default:
×
802
                }
803

804
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
805
                        expPolicy.ExtraOpaqueData = nil
×
806
                }
×
807
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
808
                        migPolicy.ExtraOpaqueData = nil
×
809
                }
×
810

811
                return sqldb.CompareRecords(
×
812
                        *expPolicy, *migPolicy, "channel policy",
×
813
                )
×
814
        }
815

816
        err = checkPolicy(info.policy1, migPol1)
×
817
        if err != nil {
×
818
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
819
                        err)
×
820
        }
×
821

822
        err = checkPolicy(info.policy2, migPol2)
×
823
        if err != nil {
×
824
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
825
                        err)
×
826
        }
×
827

828
        return nil
×
829
}
830

831
// migratePruneLog migrates the prune log from the KV backend to the SQL
832
// database. It collects entries in batches, inserts them individually, and then
833
// validates them in batches using GetPruneEntriesForHeights for better i
834
// performance.
835
func migratePruneLog(ctx context.Context, cfg *sqldb.QueryConfig,
836
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
837

×
838
        var (
×
839
                totalTime = time.Now()
×
840

×
841
                count          uint64
×
842
                pruneTipHeight uint32
×
843
                pruneTipHash   chainhash.Hash
×
844

×
845
                t0    = time.Now()
×
846
                chunk uint64
×
847
                s     = rate.Sometimes{
×
848
                        Interval: 10 * time.Second,
×
849
                }
×
850
        )
×
851

×
852
        batch := make(map[uint32]chainhash.Hash, cfg.MaxBatchSize)
×
853

×
854
        // validateBatch validates a batch of prune entries using batch query.
×
855
        validateBatch := func() error {
×
856
                if len(batch) == 0 {
×
857
                        return nil
×
858
                }
×
859

860
                // Extract heights for the batch query.
861
                heights := make([]int64, 0, len(batch))
×
862
                for height := range batch {
×
863
                        heights = append(heights, int64(height))
×
864
                }
×
865

866
                // Batch fetch all entries from the database.
867
                rows, err := sqlDB.GetPruneEntriesForHeights(ctx, heights)
×
868
                if err != nil {
×
869
                        return fmt.Errorf("could not batch get prune "+
×
870
                                "entries: %w", err)
×
871
                }
×
872

873
                if len(rows) != len(batch) {
×
874
                        return fmt.Errorf("expected to fetch %d prune "+
×
875
                                "entries, but got %d", len(batch),
×
876
                                len(rows))
×
877
                }
×
878

879
                // Validate each entry in the batch.
880
                for _, row := range rows {
×
881
                        kvdbHash, ok := batch[uint32(row.BlockHeight)]
×
882
                        if !ok {
×
883
                                return fmt.Errorf("prune entry for height %d "+
×
884
                                        "not found in batch", row.BlockHeight)
×
885
                        }
×
886

887
                        err := sqldb.CompareRecords(
×
888
                                kvdbHash[:], row.BlockHash,
×
889
                                fmt.Sprintf("prune log entry at height %d",
×
890
                                        row.BlockHash),
×
891
                        )
×
892
                        if err != nil {
×
893
                                return err
×
894
                        }
×
895
                }
896

897
                // Reset the batch map for the next iteration.
898
                batch = make(map[uint32]chainhash.Hash, cfg.MaxBatchSize)
×
899

×
900
                return nil
×
901
        }
902

903
        // Iterate over each prune log entry in the KV store and migrate it to
904
        // the SQL database.
905
        err := forEachPruneLogEntry(
×
906
                kvBackend, func(height uint32, hash *chainhash.Hash) error {
×
907
                        count++
×
908
                        chunk++
×
909

×
910
                        // Keep track of the prune tip height and hash.
×
911
                        if height > pruneTipHeight {
×
912
                                pruneTipHeight = height
×
913
                                pruneTipHash = *hash
×
914
                        }
×
915

916
                        // Insert the entry (individual inserts for now).
917
                        err := sqlDB.UpsertPruneLogEntry(
×
918
                                ctx, sqlc.UpsertPruneLogEntryParams{
×
919
                                        BlockHeight: int64(height),
×
920
                                        BlockHash:   hash[:],
×
921
                                },
×
922
                        )
×
923
                        if err != nil {
×
924
                                return fmt.Errorf("unable to insert prune log "+
×
925
                                        "entry for height %d: %w", height, err)
×
926
                        }
×
927

928
                        // Add to validation batch.
929
                        batch[height] = *hash
×
930

×
931
                        // Validate batch when full.
×
932
                        if len(batch) >= int(cfg.MaxBatchSize) {
×
933
                                err := validateBatch()
×
934
                                if err != nil {
×
935
                                        return fmt.Errorf("batch "+
×
936
                                                "validation failed: %w", err)
×
937
                                }
×
938
                        }
939

940
                        s.Do(func() {
×
941
                                elapsed := time.Since(t0).Seconds()
×
942
                                ratePerSec := float64(chunk) / elapsed
×
943
                                log.Debugf("Migrated %d prune log "+
×
944
                                        "entries (%.2f entries/sec)",
×
945
                                        count, ratePerSec)
×
946

×
947
                                t0 = time.Now()
×
948
                                chunk = 0
×
949
                        })
×
950

951
                        return nil
×
952
                },
953
        )
954
        if err != nil {
×
955
                return fmt.Errorf("could not migrate prune log: %w", err)
×
956
        }
×
957

958
        // Validate any remaining entries in the batch.
959
        if len(batch) > 0 {
×
960
                err := validateBatch()
×
961
                if err != nil {
×
962
                        return fmt.Errorf("final batch validation failed: %w",
×
963
                                err)
×
964
                }
×
965
        }
966

967
        // Check that the prune tip is set correctly in the SQL
968
        // database.
969
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
970
        if errors.Is(err, sql.ErrNoRows) {
×
971
                // The ErrGraphNeverPruned error is expected if no prune log
×
972
                // entries were migrated from the kvdb store. Otherwise, it's
×
973
                // an unexpected error.
×
974
                if count == 0 {
×
975
                        log.Infof("No prune log entries found in KV store " +
×
976
                                "to migrate")
×
977
                        return nil
×
978
                }
×
979
                // Fall-through to the next error check.
980
        }
981
        if err != nil {
×
982
                return fmt.Errorf("could not get prune tip: %w", err)
×
983
        }
×
984

985
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
986
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
987

×
988
                return fmt.Errorf("prune tip mismatch after migration: "+
×
989
                        "expected height %d, hash %s; got height %d, "+
×
990
                        "hash %s", pruneTipHeight, pruneTipHash,
×
991
                        pruneTip.BlockHeight,
×
992
                        chainhash.Hash(pruneTip.BlockHash))
×
993
        }
×
994

995
        log.Infof("Migrated %d prune log entries from KV to SQL in %s. "+
×
996
                "The prune tip is: height %d, hash: %s", count,
×
997
                time.Since(totalTime), pruneTipHeight, pruneTipHash)
×
998

×
999
        return nil
×
1000
}
1001

1002
// forEachPruneLogEntry iterates over each prune log entry in the KV
1003
// backend and calls the provided callback function for each entry.
1004
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
1005
        hash *chainhash.Hash) error) error {
×
1006

×
1007
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
1008
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
1009
                if metaBucket == nil {
×
1010
                        return ErrGraphNotFound
×
1011
                }
×
1012

1013
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
1014
                if pruneBucket == nil {
×
1015
                        // The graph has never been pruned and so, there are no
×
1016
                        // entries to iterate over.
×
1017
                        return nil
×
1018
                }
×
1019

1020
                return pruneBucket.ForEach(func(k, v []byte) error {
×
1021
                        blockHeight := byteOrder.Uint32(k)
×
1022
                        var blockHash chainhash.Hash
×
1023
                        copy(blockHash[:], v)
×
1024

×
1025
                        return cb(blockHeight, &blockHash)
×
1026
                })
×
1027
        }, func() {})
×
1028
}
1029

1030
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
1031
// the SQL database. It collects SCIDs in batches, inserts them individually,
1032
// and then validates them in batches using GetClosedChannelsSCIDs for better
1033
// performance.
1034
func migrateClosedSCIDIndex(ctx context.Context, cfg *sqldb.QueryConfig,
1035
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
1036

×
1037
        var (
×
1038
                totalTime = time.Now()
×
1039

×
1040
                count uint64
×
1041

×
1042
                t0    = time.Now()
×
1043
                chunk uint64
×
1044
                s     = rate.Sometimes{
×
1045
                        Interval: 10 * time.Second,
×
1046
                }
×
1047
        )
×
1048

×
1049
        batch := make([][]byte, 0, cfg.MaxBatchSize)
×
1050

×
1051
        // validateBatch validates a batch of closed SCIDs using batch query.
×
1052
        validateBatch := func() error {
×
1053
                if len(batch) == 0 {
×
1054
                        return nil
×
1055
                }
×
1056

1057
                // Batch fetch all closed SCIDs from the database.
1058
                dbSCIDs, err := sqlDB.GetClosedChannelsSCIDs(ctx, batch)
×
1059
                if err != nil {
×
1060
                        return fmt.Errorf("could not batch get closed "+
×
1061
                                "SCIDs: %w", err)
×
1062
                }
×
1063

1064
                // Create set of SCIDs that exist in the database for quick
1065
                // lookup.
1066
                dbSCIDSet := make(map[string]struct{})
×
1067
                for _, scid := range dbSCIDs {
×
1068
                        dbSCIDSet[string(scid)] = struct{}{}
×
1069
                }
×
1070

1071
                // Validate each SCID in the batch.
1072
                for _, expectedSCID := range batch {
×
1073
                        if _, found := dbSCIDSet[string(expectedSCID)]; !found {
×
1074
                                return fmt.Errorf("closed SCID %x not found "+
×
1075
                                        "in database", expectedSCID)
×
1076
                        }
×
1077
                }
1078

1079
                // Reset the batch for the next iteration.
1080
                batch = make([][]byte, 0, cfg.MaxBatchSize)
×
1081

×
1082
                return nil
×
1083
        }
1084

1085
        migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
×
1086
                count++
×
1087
                chunk++
×
1088

×
1089
                chanIDB := channelIDToBytes(scid.ToUint64())
×
1090
                err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
1091
                if err != nil {
×
1092
                        return fmt.Errorf("could not insert closed channel "+
×
1093
                                "with SCID %s: %w", scid, err)
×
1094
                }
×
1095

1096
                // Add to validation batch.
1097
                batch = append(batch, chanIDB)
×
1098

×
1099
                // Validate batch when full.
×
1100
                if len(batch) >= int(cfg.MaxBatchSize) {
×
1101
                        err := validateBatch()
×
1102
                        if err != nil {
×
1103
                                return fmt.Errorf("batch validation failed: %w",
×
1104
                                        err)
×
1105
                        }
×
1106
                }
1107

1108
                s.Do(func() {
×
1109
                        elapsed := time.Since(t0).Seconds()
×
1110
                        ratePerSec := float64(chunk) / elapsed
×
1111
                        log.Debugf("Migrated %d closed scids "+
×
1112
                                "(%.2f entries/sec)", count, ratePerSec)
×
1113

×
1114
                        t0 = time.Now()
×
1115
                        chunk = 0
×
1116
                })
×
1117

1118
                return nil
×
1119
        }
1120

1121
        err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID)
×
1122
        if err != nil {
×
1123
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
1124
                        err)
×
1125
        }
×
1126

1127
        // Validate any remaining SCIDs in the batch.
1128
        if len(batch) > 0 {
×
1129
                err := validateBatch()
×
1130
                if err != nil {
×
1131
                        return fmt.Errorf("final batch validation failed: %w",
×
1132
                                err)
×
1133
                }
×
1134
        }
1135

1136
        log.Infof("Migrated %d closed SCIDs from KV to SQL in %s", count,
×
1137
                time.Since(totalTime))
×
1138

×
1139
        return nil
×
1140
}
1141

1142
// migrateZombieIndex migrates the zombie index from the KV backend to the SQL
1143
// database. It collects zombie channels in batches, inserts them individually,
1144
// and validates them in batches.
1145
//
1146
// NOTE: before inserting an entry into the zombie index, the function checks
1147
// if the channel is already marked as closed in the SQL store. If it is,
1148
// the entry is skipped. This means that the resulting zombie index count in
1149
// the SQL store may well be less than the count of zombie channels in the KV
1150
// store.
1151
func migrateZombieIndex(ctx context.Context, cfg *sqldb.QueryConfig,
1152
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
1153

×
1154
        var (
×
1155
                totalTime = time.Now()
×
1156

×
1157
                count uint64
×
1158

×
1159
                t0    = time.Now()
×
1160
                chunk uint64
×
1161
                s     = rate.Sometimes{
×
1162
                        Interval: 10 * time.Second,
×
1163
                }
×
1164
        )
×
1165

×
1166
        type zombieEntry struct {
×
1167
                pub1 route.Vertex
×
1168
                pub2 route.Vertex
×
1169
        }
×
1170

×
1171
        batch := make(map[uint64]*zombieEntry, cfg.MaxBatchSize)
×
1172

×
1173
        // validateBatch validates a batch of zombie SCIDs using batch query.
×
1174
        validateBatch := func() error {
×
1175
                if len(batch) == 0 {
×
1176
                        return nil
×
1177
                }
×
1178

1179
                scids := make([][]byte, 0, len(batch))
×
1180
                for scid := range batch {
×
1181
                        scids = append(scids, channelIDToBytes(scid))
×
1182
                }
×
1183

1184
                // Batch fetch all zombie channels from the database.
1185
                rows, err := sqlDB.GetZombieChannelsSCIDs(
×
1186
                        ctx, sqlc.GetZombieChannelsSCIDsParams{
×
1187
                                Version: int16(ProtocolV1),
×
1188
                                Scids:   scids,
×
1189
                        },
×
1190
                )
×
1191
                if err != nil {
×
1192
                        return fmt.Errorf("could not batch get zombie "+
×
1193
                                "SCIDs: %w", err)
×
1194
                }
×
1195

1196
                // Make sure that the number of rows returned matches
1197
                // the number of SCIDs we requested.
1198
                if len(rows) != len(scids) {
×
1199
                        return fmt.Errorf("expected to fetch %d zombie "+
×
1200
                                "SCIDs, but got %d", len(scids), len(rows))
×
1201
                }
×
1202

1203
                // Validate each row is in the batch.
1204
                for _, row := range rows {
×
1205
                        scid := byteOrder.Uint64(row.Scid)
×
1206

×
1207
                        kvdbZombie, ok := batch[scid]
×
1208
                        if !ok {
×
1209
                                return fmt.Errorf("zombie SCID %x not found "+
×
1210
                                        "in batch", scid)
×
1211
                        }
×
1212

1213
                        err = sqldb.CompareRecords(
×
1214
                                kvdbZombie.pub1[:], row.NodeKey1,
×
1215
                                fmt.Sprintf("zombie pub key 1 (%s) for "+
×
1216
                                        "channel %d", kvdbZombie.pub1, scid),
×
1217
                        )
×
1218
                        if err != nil {
×
1219
                                return err
×
1220
                        }
×
1221

1222
                        err = sqldb.CompareRecords(
×
1223
                                kvdbZombie.pub2[:], row.NodeKey2,
×
1224
                                fmt.Sprintf("zombie pub key 2 (%s) for "+
×
1225
                                        "channel %d", kvdbZombie.pub2, scid),
×
1226
                        )
×
1227
                        if err != nil {
×
1228
                                return err
×
1229
                        }
×
1230
                }
1231

1232
                // Reset the batch for the next iteration.
1233
                batch = make(map[uint64]*zombieEntry, cfg.MaxBatchSize)
×
1234

×
1235
                return nil
×
1236
        }
1237

1238
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
1239
                pubKey2 [33]byte) error {
×
1240

×
1241
                chanIDB := channelIDToBytes(chanID)
×
1242

×
1243
                // If it is in the closed SCID index, we don't need to
×
1244
                // add it to the zombie index.
×
1245
                //
×
1246
                // NOTE: this means that the resulting zombie index count in
×
1247
                // the SQL store may well be less than the count of zombie
×
1248
                // channels in the KV store.
×
1249
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
1250
                if err != nil {
×
1251
                        return fmt.Errorf("could not check closed "+
×
1252
                                "channel: %w", err)
×
1253
                }
×
1254
                if isClosed {
×
1255
                        return nil
×
1256
                }
×
1257

1258
                count++
×
1259
                chunk++
×
1260

×
1261
                err = sqlDB.UpsertZombieChannel(
×
1262
                        ctx, sqlc.UpsertZombieChannelParams{
×
1263
                                Version:  int16(ProtocolV1),
×
1264
                                Scid:     chanIDB,
×
1265
                                NodeKey1: pubKey1[:],
×
1266
                                NodeKey2: pubKey2[:],
×
1267
                        },
×
1268
                )
×
1269
                if err != nil {
×
1270
                        return fmt.Errorf("could not upsert zombie "+
×
1271
                                "channel %d: %w", chanID, err)
×
1272
                }
×
1273

1274
                // Add to validation batch only after successful insertion.
1275
                batch[chanID] = &zombieEntry{
×
1276
                        pub1: pubKey1,
×
1277
                        pub2: pubKey2,
×
1278
                }
×
1279

×
1280
                // Validate batch when full.
×
1281
                if len(batch) >= int(cfg.MaxBatchSize) {
×
1282
                        err := validateBatch()
×
1283
                        if err != nil {
×
1284
                                return fmt.Errorf("batch validation failed: %w",
×
1285
                                        err)
×
1286
                        }
×
1287
                }
1288

1289
                s.Do(func() {
×
1290
                        elapsed := time.Since(t0).Seconds()
×
1291
                        ratePerSec := float64(chunk) / elapsed
×
1292
                        log.Debugf("Migrated %d zombie index entries "+
×
1293
                                "(%.2f entries/sec)", count, ratePerSec)
×
1294

×
1295
                        t0 = time.Now()
×
1296
                        chunk = 0
×
1297
                })
×
1298

1299
                return nil
×
1300
        })
1301
        if err != nil {
×
1302
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
1303
        }
×
1304

1305
        // Validate any remaining zombie SCIDs in the batch.
1306
        if len(batch) > 0 {
×
1307
                err := validateBatch()
×
1308
                if err != nil {
×
1309
                        return fmt.Errorf("final batch validation failed: %w",
×
1310
                                err)
×
1311
                }
×
1312
        }
1313

1314
        log.Infof("Migrated %d zombie channels from KV to SQL in %s", count,
×
1315
                time.Since(totalTime))
×
1316

×
1317
        return nil
×
1318
}
1319

1320
// forEachZombieEntry iterates over each zombie channel entry in the
1321
// KV backend and calls the provided callback function for each entry.
1322
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
1323
        pubKey2 [33]byte) error) error {
×
1324

×
1325
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
1326
                edges := tx.ReadBucket(edgeBucket)
×
1327
                if edges == nil {
×
1328
                        return ErrGraphNoEdgesFound
×
1329
                }
×
1330
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
1331
                if zombieIndex == nil {
×
1332
                        return nil
×
1333
                }
×
1334

1335
                return zombieIndex.ForEach(func(k, v []byte) error {
×
1336
                        var pubKey1, pubKey2 [33]byte
×
1337
                        copy(pubKey1[:], v[:33])
×
1338
                        copy(pubKey2[:], v[33:])
×
1339

×
1340
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
1341
                })
×
1342
        }, func() {})
×
1343
}
1344

1345
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
1346
// the provided callback function for each SCID.
1347
func forEachClosedSCID(db kvdb.Backend,
1348
        cb func(lnwire.ShortChannelID) error) error {
×
1349

×
1350
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
1351
                closedScids := tx.ReadBucket(closedScidBucket)
×
1352
                if closedScids == nil {
×
1353
                        return nil
×
1354
                }
×
1355

1356
                return closedScids.ForEach(func(k, _ []byte) error {
×
1357
                        return cb(lnwire.NewShortChanIDFromInt(
×
1358
                                byteOrder.Uint64(k),
×
1359
                        ))
×
1360
                })
×
1361
        }, func() {})
×
1362
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc