• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16969463412

14 Aug 2025 03:23PM UTC coverage: 66.776% (-0.2%) from 66.929%
16969463412

push

github

web-flow
Merge pull request #10155 from ziggie1984/add-missing-invoice-settle-index

Add missing invoice index for native sql

135916 of 203540 relevant lines covered (66.78%)

21469.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "cmp"
6
        "context"
7
        "database/sql"
8
        "errors"
9
        "fmt"
10
        "net"
11
        "slices"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/routing/route"
19
        "github.com/lightningnetwork/lnd/sqldb"
20
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
21
        "golang.org/x/time/rate"
22
)
23

24
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
25
// backend.
26
//
27
// NOTE: this is currently not called from any code path. It is called via tests
28
// only for now and will be called from the main lnd binary once the
29
// migration is fully implemented and tested.
30
func MigrateGraphToSQL(ctx context.Context, cfg *SQLStoreConfig,
31
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
32

×
33
        log.Infof("Starting migration of the graph store from KV to SQL")
×
34
        t0 := time.Now()
×
35

×
36
        // Check if there is a graph to migrate.
×
37
        graphExists, err := checkGraphExists(kvBackend)
×
38
        if err != nil {
×
39
                return fmt.Errorf("failed to check graph existence: %w", err)
×
40
        }
×
41
        if !graphExists {
×
42
                log.Infof("No graph found in KV store, skipping the migration")
×
43
                return nil
×
44
        }
×
45

46
        // 1) Migrate all the nodes.
47
        err = migrateNodes(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
48
        if err != nil {
×
49
                return fmt.Errorf("could not migrate nodes: %w", err)
×
50
        }
×
51

52
        // 2) Migrate the source node.
53
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
54
                return fmt.Errorf("could not migrate source node: %w", err)
×
55
        }
×
56

57
        // 3) Migrate all the channels and channel policies.
58
        err = migrateChannelsAndPolicies(ctx, cfg, kvBackend, sqlDB)
×
59
        if err != nil {
×
60
                return fmt.Errorf("could not migrate channels and policies: %w",
×
61
                        err)
×
62
        }
×
63

64
        // 4) Migrate the Prune log.
65
        err = migratePruneLog(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
66
        if err != nil {
×
67
                return fmt.Errorf("could not migrate prune log: %w", err)
×
68
        }
×
69

70
        // 5) Migrate the closed SCID index.
71
        err = migrateClosedSCIDIndex(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
72
        if err != nil {
×
73
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
74
                        err)
×
75
        }
×
76

77
        // 6) Migrate the zombie index.
78
        err = migrateZombieIndex(ctx, cfg.QueryCfg, kvBackend, sqlDB)
×
79
        if err != nil {
×
80
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
81
        }
×
82

83
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
84
                time.Since(t0))
×
85

×
86
        return nil
×
87
}
88

89
// checkGraphExists checks if the graph exists in the KV backend.
90
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
91
        // Check if there is even a graph to migrate.
×
92
        err := db.View(func(tx kvdb.RTx) error {
×
93
                // Check for the existence of the node bucket which is a top
×
94
                // level bucket that would have been created on the initial
×
95
                // creation of the graph store.
×
96
                nodes := tx.ReadBucket(nodeBucket)
×
97
                if nodes == nil {
×
98
                        return ErrGraphNotFound
×
99
                }
×
100

101
                return nil
×
102
        }, func() {})
×
103
        if errors.Is(err, ErrGraphNotFound) {
×
104
                return false, nil
×
105
        } else if err != nil {
×
106
                return false, err
×
107
        }
×
108

109
        return true, nil
×
110
}
111

112
// migrateNodes migrates all nodes from the KV backend to the SQL database.
113
// It collects nodes in batches, inserts them individually, and then validates
114
// them in batches.
115
func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig,
116
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
117

×
118
        // Keep track of the number of nodes migrated and the number of
×
119
        // nodes skipped due to errors.
×
120
        var (
×
121
                totalTime = time.Now()
×
122

×
123
                count   uint64
×
124
                skipped uint64
×
125

×
126
                t0    = time.Now()
×
127
                chunk uint64
×
128
                s     = rate.Sometimes{
×
129
                        Interval: 10 * time.Second,
×
130
                }
×
131
        )
×
132

×
133
        // batch is a map that holds node objects that have been migrated to
×
134
        // the native SQL store that have yet to be validated. The object's held
×
135
        // by this map were derived from the KVDB store and so when they are
×
136
        // validated, the map index (the SQL store node ID) will be used to
×
137
        // fetch the corresponding node object in the SQL store, and it will
×
138
        // then be compared against the original KVDB node object.
×
139
        batch := make(
×
140
                map[int64]*models.LightningNode, cfg.MaxBatchSize,
×
141
        )
×
142

×
143
        // validateBatch validates that the batch of nodes in the 'batch' map
×
144
        // have been migrated successfully.
×
145
        validateBatch := func() error {
×
146
                if len(batch) == 0 {
×
147
                        return nil
×
148
                }
×
149

150
                // Extract DB node IDs.
151
                dbIDs := make([]int64, 0, len(batch))
×
152
                for dbID := range batch {
×
153
                        dbIDs = append(dbIDs, dbID)
×
154
                }
×
155

156
                // Batch fetch all nodes from the database.
157
                dbNodes, err := sqlDB.GetNodesByIDs(ctx, dbIDs)
×
158
                if err != nil {
×
159
                        return fmt.Errorf("could not batch fetch nodes: %w",
×
160
                                err)
×
161
                }
×
162

163
                // Make sure that the number of nodes fetched matches the number
164
                // of nodes in the batch.
165
                if len(dbNodes) != len(batch) {
×
166
                        return fmt.Errorf("expected to fetch %d nodes, "+
×
167
                                "but got %d", len(batch), len(dbNodes))
×
168
                }
×
169

170
                // Now, batch fetch the normalised data for all the nodes in
171
                // the batch.
172
                batchData, err := batchLoadNodeData(ctx, cfg, sqlDB, dbIDs)
×
173
                if err != nil {
×
174
                        return fmt.Errorf("unable to batch load node data: %w",
×
175
                                err)
×
176
                }
×
177

178
                for _, dbNode := range dbNodes {
×
179
                        // Get the KVDB node info from the batch map.
×
180
                        node, ok := batch[dbNode.ID]
×
181
                        if !ok {
×
182
                                return fmt.Errorf("node with ID %d not found "+
×
183
                                        "in batch", dbNode.ID)
×
184
                        }
×
185

186
                        // Build the migrated node from the DB node and the
187
                        // batch node data.
188
                        migNode, err := buildNodeWithBatchData(
×
189
                                dbNode, batchData,
×
190
                        )
×
191
                        if err != nil {
×
192
                                return fmt.Errorf("could not build migrated "+
×
193
                                        "node from dbNode(db id: %d, node "+
×
194
                                        "pub: %x): %w", dbNode.ID,
×
195
                                        node.PubKeyBytes, err)
×
196
                        }
×
197

198
                        // Make sure that the node addresses are sorted before
199
                        // comparing them to ensure that the order of addresses
200
                        // does not affect the comparison.
201
                        slices.SortFunc(
×
202
                                node.Addresses, func(i, j net.Addr) int {
×
203
                                        return cmp.Compare(
×
204
                                                i.String(), j.String(),
×
205
                                        )
×
206
                                },
×
207
                        )
208
                        slices.SortFunc(
×
209
                                migNode.Addresses, func(i, j net.Addr) int {
×
210
                                        return cmp.Compare(
×
211
                                                i.String(), j.String(),
×
212
                                        )
×
213
                                },
×
214
                        )
215

216
                        err = sqldb.CompareRecords(
×
217
                                node, migNode,
×
218
                                fmt.Sprintf("node %x", node.PubKeyBytes),
×
219
                        )
×
220
                        if err != nil {
×
221
                                return fmt.Errorf("node mismatch after "+
×
222
                                        "migration for node %x: %w",
×
223
                                        node.PubKeyBytes, err)
×
224
                        }
×
225
                }
226

227
                // Clear the batch map for the next iteration.
228
                batch = make(
×
229
                        map[int64]*models.LightningNode, cfg.MaxBatchSize,
×
230
                )
×
231

×
232
                return nil
×
233
        }
234

235
        // Loop through each node in the KV store and insert it into the SQL
236
        // database.
237
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
238
                node *models.LightningNode) error {
×
239

×
240
                pub := node.PubKeyBytes
×
241

×
242
                // Sanity check to ensure that the node has valid extra opaque
×
243
                // data. If it does not, we'll skip it. We need to do this
×
244
                // because previously we would just persist any TLV bytes that
×
245
                // we received without validating them. Now, however, we
×
246
                // normalise the storage of extra opaque data, so we need to
×
247
                // ensure that the data is valid. We don't want to abort the
×
248
                // migration if we encounter a node with invalid extra opaque
×
249
                // data, so we'll just skip it and log a warning.
×
250
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
251
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
252
                        skipped++
×
253
                        log.Warnf("Skipping migration of node %x with invalid "+
×
254
                                "extra opaque data: %v", pub,
×
255
                                node.ExtraOpaqueData)
×
256

×
257
                        return nil
×
258
                } else if err != nil {
×
259
                        return fmt.Errorf("unable to marshal extra "+
×
260
                                "opaque data for node %x: %w", pub, err)
×
261
                }
×
262

263
                count++
×
264
                chunk++
×
265

×
266
                // TODO(elle): At this point, we should check the loaded node
×
267
                // to see if we should extract any DNS addresses from its
×
268
                // opaque type addresses. This is expected to be done in:
×
269
                // https://github.com/lightningnetwork/lnd/pull/9455.
×
270
                // This TODO is being tracked in
×
271
                //  https://github.com/lightningnetwork/lnd/issues/9795 as this
×
272
                // must be addressed before making this code path active in
×
273
                // production.
×
274

×
275
                // Write the node to the SQL database.
×
276
                id, err := upsertNode(ctx, sqlDB, node)
×
277
                if err != nil {
×
278
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
279
                                err)
×
280
                }
×
281

282
                // Add to validation batch.
283
                batch[id] = node
×
284

×
285
                // Validate batch when full.
×
286
                if len(batch) >= int(cfg.MaxBatchSize) {
×
287
                        err := validateBatch()
×
288
                        if err != nil {
×
289
                                return fmt.Errorf("batch validation failed: %w",
×
290
                                        err)
×
291
                        }
×
292
                }
293

294
                s.Do(func() {
×
295
                        elapsed := time.Since(t0).Seconds()
×
296
                        ratePerSec := float64(chunk) / elapsed
×
297
                        log.Debugf("Migrated %d nodes (%.2f nodes/sec)",
×
298
                                count, ratePerSec)
×
299

×
300
                        t0 = time.Now()
×
301
                        chunk = 0
×
302
                })
×
303

304
                return nil
×
305
        }, func() {
×
306
                // No reset is needed since if a retry occurs, the entire
×
307
                // migration will be retried from the start.
×
308
        })
×
309
        if err != nil {
×
310
                return fmt.Errorf("could not migrate nodes: %w", err)
×
311
        }
×
312

313
        // Validate any remaining nodes in the batch.
314
        if len(batch) > 0 {
×
315
                err := validateBatch()
×
316
                if err != nil {
×
317
                        return fmt.Errorf("final batch validation failed: %w",
×
318
                                err)
×
319
                }
×
320
        }
321

322
        log.Infof("Migrated %d nodes from KV to SQL in %v (skipped %d nodes "+
×
323
                "due to invalid TLV streams)", count, time.Since(totalTime),
×
324
                skipped)
×
325

×
326
        return nil
×
327
}
328

329
// migrateSourceNode migrates the source node from the KV backend to the
330
// SQL database.
331
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
332
        sqlDB SQLQueries) error {
×
333

×
334
        log.Debugf("Migrating source node from KV to SQL")
×
335

×
336
        sourceNode, err := sourceNode(kvdb)
×
337
        if errors.Is(err, ErrSourceNodeNotSet) {
×
338
                // If the source node has not been set yet, we can skip this
×
339
                // migration step.
×
340
                return nil
×
341
        } else if err != nil {
×
342
                return fmt.Errorf("could not get source node from kv "+
×
343
                        "store: %w", err)
×
344
        }
×
345

346
        pub := sourceNode.PubKeyBytes
×
347

×
348
        // Get the DB ID of the source node by its public key. This node must
×
349
        // already exist in the SQL database, as it should have been migrated
×
350
        // in the previous node-migration step.
×
351
        id, err := sqlDB.GetNodeIDByPubKey(
×
352
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
353
                        PubKey:  pub[:],
×
354
                        Version: int16(ProtocolV1),
×
355
                },
×
356
        )
×
357
        if err != nil {
×
358
                return fmt.Errorf("could not get source node ID: %w", err)
×
359
        }
×
360

361
        // Now we can add the source node to the SQL database.
362
        err = sqlDB.AddSourceNode(ctx, id)
×
363
        if err != nil {
×
364
                return fmt.Errorf("could not add source node to SQL store: %w",
×
365
                        err)
×
366
        }
×
367

368
        // Verify that the source node was added correctly by fetching it back
369
        // from the SQL database and checking that the expected DB ID and
370
        // pub key are returned. We don't need to do a whole node comparison
371
        // here, as this was already done in the previous migration step.
372
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
373
        if err != nil {
×
374
                return fmt.Errorf("could not get source nodes from SQL "+
×
375
                        "store: %w", err)
×
376
        }
×
377

378
        // The SQL store has support for multiple source nodes (for future
379
        // protocol versions) but this migration is purely aimed at the V1
380
        // store, and so we expect exactly one source node to be present.
381
        if len(srcNodes) != 1 {
×
382
                return fmt.Errorf("expected exactly one source node, "+
×
383
                        "got %d", len(srcNodes))
×
384
        }
×
385

386
        // Check that the source node ID and pub key match the original
387
        // source node.
388
        if srcNodes[0].NodeID != id {
×
389
                return fmt.Errorf("source node ID mismatch after migration: "+
×
390
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
391
        }
×
392
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
393
        if err != nil {
×
394
                return fmt.Errorf("source node pubkey mismatch after "+
×
395
                        "migration: %w", err)
×
396
        }
×
397

398
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
399

×
400
        return nil
×
401
}
402

403
// migChanInfo holds the information about a channel and its policies.
404
type migChanInfo struct {
405
        // edge is the channel object as read from the KVDB source.
406
        edge *models.ChannelEdgeInfo
407

408
        // policy1 is the first channel policy for the channel as read from
409
        // the KVDB source.
410
        policy1 *models.ChannelEdgePolicy
411

412
        // policy2 is the second channel policy for the channel as read
413
        // from the KVDB source.
414
        policy2 *models.ChannelEdgePolicy
415

416
        // dbInfo holds location info (in the form of DB IDs) of the channel
417
        // and its policies in the native-SQL destination.
418
        dbInfo *dbChanInfo
419
}
420

421
// migrateChannelsAndPolicies migrates all channels and their policies
422
// from the KV backend to the SQL database.
423
func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
424
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
425

×
426
        var (
×
427
                totalTime = time.Now()
×
428

×
429
                channelCount       uint64
×
430
                skippedChanCount   uint64
×
431
                policyCount        uint64
×
432
                skippedPolicyCount uint64
×
433

×
434
                t0    = time.Now()
×
435
                chunk uint64
×
436
                s     = rate.Sometimes{
×
437
                        Interval: 10 * time.Second,
×
438
                }
×
439
        )
×
440
        migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
×
441
                // If the policy is nil, we can skip it.
×
442
                if policy == nil {
×
443
                        return nil
×
444
                }
×
445

446
                // Unlike the special case of invalid TLV bytes for node and
447
                // channel announcements, we don't need to handle the case for
448
                // channel policies here because it is already handled in the
449
                // `forEachChannel` function. If the policy has invalid TLV
450
                // bytes, then `nil` will be passed to this function.
451

452
                policyCount++
×
453

×
454
                _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
×
455
                if err != nil {
×
456
                        return fmt.Errorf("could not migrate channel "+
×
457
                                "policy %d: %w", policy.ChannelID, err)
×
458
                }
×
459

460
                return nil
×
461
        }
462

463
        // batch is used to collect migrated channel info that we will
464
        // batch-validate. Each entry is indexed by the DB ID of the channel
465
        // in the SQL database.
466
        batch := make(map[int64]*migChanInfo, cfg.QueryCfg.MaxBatchSize)
×
467

×
468
        // Iterate over each channel in the KV store and migrate it and its
×
469
        // policies to the SQL database.
×
470
        err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
×
471
                policy1 *models.ChannelEdgePolicy,
×
472
                policy2 *models.ChannelEdgePolicy) error {
×
473

×
474
                scid := channel.ChannelID
×
475

×
476
                // Here, we do a sanity check to ensure that the chain hash of
×
477
                // the channel returned by the KV store matches the expected
×
478
                // chain hash. This is important since in the SQL store, we will
×
479
                // no longer explicitly store the chain hash in the channel
×
480
                // info, but rather rely on the chain hash LND is running with.
×
481
                // So this is our way of ensuring that LND is running on the
×
482
                // correct network at migration time.
×
483
                if channel.ChainHash != cfg.ChainHash {
×
484
                        return fmt.Errorf("channel %d has chain hash %s, "+
×
485
                                "expected %s", scid, channel.ChainHash,
×
486
                                cfg.ChainHash)
×
487
                }
×
488

489
                // Sanity check to ensure that the channel has valid extra
490
                // opaque data. If it does not, we'll skip it. We need to do
491
                // this because previously we would just persist any TLV bytes
492
                // that we received without validating them. Now, however, we
493
                // normalise the storage of extra opaque data, so we need to
494
                // ensure that the data is valid. We don't want to abort the
495
                // migration if we encounter a channel with invalid extra opaque
496
                // data, so we'll just skip it and log a warning.
497
                _, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
×
498
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
499
                        log.Warnf("Skipping channel %d with invalid "+
×
500
                                "extra opaque data: %v", scid,
×
501
                                channel.ExtraOpaqueData)
×
502

×
503
                        skippedChanCount++
×
504

×
505
                        // If we skip a channel, we also skip its policies.
×
506
                        if policy1 != nil {
×
507
                                skippedPolicyCount++
×
508
                        }
×
509
                        if policy2 != nil {
×
510
                                skippedPolicyCount++
×
511
                        }
×
512

513
                        return nil
×
514
                } else if err != nil {
×
515
                        return fmt.Errorf("unable to marshal extra opaque "+
×
516
                                "data for channel %d (%v): %w", scid,
×
517
                                channel.ExtraOpaqueData, err)
×
518
                }
×
519

520
                channelCount++
×
521
                chunk++
×
522

×
523
                // Migrate the channel info along with its policies.
×
524
                dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
×
525
                if err != nil {
×
526
                        return fmt.Errorf("could not insert record for "+
×
527
                                "channel %d in SQL store: %w", scid, err)
×
528
                }
×
529

530
                // Now, migrate the two channel policies for the channel.
531
                err = migChanPolicy(policy1)
×
532
                if err != nil {
×
533
                        return fmt.Errorf("could not migrate policy1(%d): %w",
×
534
                                scid, err)
×
535
                }
×
536
                err = migChanPolicy(policy2)
×
537
                if err != nil {
×
538
                        return fmt.Errorf("could not migrate policy2(%d): %w",
×
539
                                scid, err)
×
540
                }
×
541

542
                // Collect the migrated channel info and policies in a batch for
543
                // later validation.
544
                batch[dbChanInfo.channelID] = &migChanInfo{
×
545
                        edge:    channel,
×
546
                        policy1: policy1,
×
547
                        policy2: policy2,
×
548
                        dbInfo:  dbChanInfo,
×
549
                }
×
550

×
551
                if len(batch) >= int(cfg.QueryCfg.MaxBatchSize) {
×
552
                        // Do batch validation.
×
553
                        err := validateMigratedChannels(ctx, cfg, sqlDB, batch)
×
554
                        if err != nil {
×
555
                                return fmt.Errorf("could not validate "+
×
556
                                        "channel batch: %w", err)
×
557
                        }
×
558

559
                        batch = make(
×
560
                                map[int64]*migChanInfo,
×
561
                                cfg.QueryCfg.MaxBatchSize,
×
562
                        )
×
563
                }
564

565
                s.Do(func() {
×
566
                        elapsed := time.Since(t0).Seconds()
×
567
                        ratePerSec := float64(chunk) / elapsed
×
568
                        log.Debugf("Migrated %d channels (%.2f channels/sec)",
×
569
                                channelCount, ratePerSec)
×
570

×
571
                        t0 = time.Now()
×
572
                        chunk = 0
×
573
                })
×
574

575
                return nil
×
576
        }, func() {
×
577
                // No reset is needed since if a retry occurs, the entire
×
578
                // migration will be retried from the start.
×
579
        })
×
580
        if err != nil {
×
581
                return fmt.Errorf("could not migrate channels and policies: %w",
×
582
                        err)
×
583
        }
×
584

585
        if len(batch) > 0 {
×
586
                // Do a final batch validation for any remaining channels.
×
587
                err := validateMigratedChannels(ctx, cfg, sqlDB, batch)
×
588
                if err != nil {
×
589
                        return fmt.Errorf("could not validate final channel "+
×
590
                                "batch: %w", err)
×
591
                }
×
592

593
                batch = make(map[int64]*migChanInfo, cfg.QueryCfg.MaxBatchSize)
×
594
        }
595

596
        log.Infof("Migrated %d channels and %d policies from KV to SQL in %s"+
×
597
                "(skipped %d channels and %d policies due to invalid TLV "+
×
598
                "streams)", channelCount, policyCount, time.Since(totalTime),
×
599
                skippedChanCount, skippedPolicyCount)
×
600

×
601
        return nil
×
602
}
603

604
// validateMigratedChannels validates the channels in the batch after they have
605
// been migrated to the SQL database. It batch fetches all channels by their IDs
606
// and compares the migrated channels and their policies with the original ones
607
// to ensure they match using batch construction patterns.
608
func validateMigratedChannels(ctx context.Context, cfg *SQLStoreConfig,
609
        sqlDB SQLQueries, batch map[int64]*migChanInfo) error {
×
610

×
611
        // Convert batch keys (DB IDs) to an int slice for the batch query.
×
612
        dbChanIDs := make([]int64, 0, len(batch))
×
613
        for id := range batch {
×
614
                dbChanIDs = append(dbChanIDs, id)
×
615
        }
×
616

617
        // Batch fetch all channels with their policies.
618
        rows, err := sqlDB.GetChannelsByIDs(ctx, dbChanIDs)
×
619
        if err != nil {
×
620
                return fmt.Errorf("could not batch get channels by IDs: %w",
×
621
                        err)
×
622
        }
×
623

624
        // Sanity check that the same number of channels were returned
625
        // as requested.
626
        if len(rows) != len(dbChanIDs) {
×
627
                return fmt.Errorf("expected to fetch %d channels, "+
×
628
                        "but got %d", len(dbChanIDs), len(rows))
×
629
        }
×
630

631
        // Collect all policy IDs needed for batch data loading.
632
        dbPolicyIDs := make([]int64, 0, len(dbChanIDs)*2)
×
633

×
634
        for _, row := range rows {
×
635
                scid := byteOrder.Uint64(row.GraphChannel.Scid)
×
636

×
637
                dbPol1, dbPol2, err := extractChannelPolicies(row)
×
638
                if err != nil {
×
639
                        return fmt.Errorf("could not extract channel policies"+
×
640
                                " for SCID %d: %w", scid, err)
×
641
                }
×
642
                if dbPol1 != nil {
×
643
                        dbPolicyIDs = append(dbPolicyIDs, dbPol1.ID)
×
644
                }
×
645
                if dbPol2 != nil {
×
646
                        dbPolicyIDs = append(dbPolicyIDs, dbPol2.ID)
×
647
                }
×
648
        }
649

650
        // Batch load all channel and policy data (features, extras).
651
        batchData, err := batchLoadChannelData(
×
652
                ctx, cfg.QueryCfg, sqlDB, dbChanIDs, dbPolicyIDs,
×
653
        )
×
654
        if err != nil {
×
655
                return fmt.Errorf("could not batch load channel and policy "+
×
656
                        "data: %w", err)
×
657
        }
×
658

659
        // Validate each channel in the batch using pre-loaded data.
660
        for _, row := range rows {
×
661
                kvdbChan, ok := batch[row.GraphChannel.ID]
×
662
                if !ok {
×
663
                        return fmt.Errorf("channel with ID %d not found "+
×
664
                                "in batch", row.GraphChannel.ID)
×
665
                }
×
666

667
                scid := byteOrder.Uint64(row.GraphChannel.Scid)
×
668

×
669
                err = validateMigratedChannelWithBatchData(
×
670
                        cfg, scid, kvdbChan, row, batchData,
×
671
                )
×
672
                if err != nil {
×
673
                        return fmt.Errorf("channel %d validation failed "+
×
674
                                "after migration: %w", scid, err)
×
675
                }
×
676
        }
677

678
        return nil
×
679
}
680

681
// validateMigratedChannelWithBatchData validates a single migrated channel
682
// using pre-fetched batch data for optimal performance.
683
func validateMigratedChannelWithBatchData(cfg *SQLStoreConfig,
684
        scid uint64, info *migChanInfo, row sqlc.GetChannelsByIDsRow,
685
        batchData *batchChannelData) error {
×
686

×
687
        dbChanInfo := info.dbInfo
×
688
        channel := info.edge
×
689

×
690
        // Assert that the DB IDs for the channel and nodes are as expected
×
691
        // given the inserted channel info.
×
692
        err := sqldb.CompareRecords(
×
693
                dbChanInfo.channelID, row.GraphChannel.ID, "channel DB ID",
×
694
        )
×
695
        if err != nil {
×
696
                return err
×
697
        }
×
698
        err = sqldb.CompareRecords(
×
699
                dbChanInfo.node1ID, row.Node1ID, "node1 DB ID",
×
700
        )
×
701
        if err != nil {
×
702
                return err
×
703
        }
×
704
        err = sqldb.CompareRecords(
×
705
                dbChanInfo.node2ID, row.Node2ID, "node2 DB ID",
×
706
        )
×
707
        if err != nil {
×
708
                return err
×
709
        }
×
710

711
        // Build node vertices from the row data.
712
        node1, node2, err := buildNodeVertices(
×
713
                row.Node1PubKey, row.Node2PubKey,
×
714
        )
×
715
        if err != nil {
×
716
                return err
×
717
        }
×
718

719
        // Build channel info using batch data.
720
        migChan, err := buildEdgeInfoWithBatchData(
×
721
                cfg.ChainHash, row.GraphChannel, node1, node2, batchData,
×
722
        )
×
723
        if err != nil {
×
724
                return fmt.Errorf("could not build migrated channel info: %w",
×
725
                        err)
×
726
        }
×
727

728
        // Extract channel policies from the row.
729
        dbPol1, dbPol2, err := extractChannelPolicies(row)
×
730
        if err != nil {
×
731
                return fmt.Errorf("could not extract channel policies: %w", err)
×
732
        }
×
733

734
        // Build channel policies using batch data.
735
        migPol1, migPol2, err := buildChanPoliciesWithBatchData(
×
736
                dbPol1, dbPol2, scid, node1, node2, batchData,
×
737
        )
×
738
        if err != nil {
×
739
                return fmt.Errorf("could not build migrated channel "+
×
740
                        "policies: %w", err)
×
741
        }
×
742

743
        // Finally, compare the original channel info and
744
        // policies with the migrated ones to ensure they match.
745
        if len(channel.ExtraOpaqueData) == 0 {
×
746
                channel.ExtraOpaqueData = nil
×
747
        }
×
748
        if len(migChan.ExtraOpaqueData) == 0 {
×
749
                migChan.ExtraOpaqueData = nil
×
750
        }
×
751

752
        err = sqldb.CompareRecords(
×
753
                channel, migChan, fmt.Sprintf("channel %d", scid),
×
754
        )
×
755
        if err != nil {
×
756
                return err
×
757
        }
×
758

759
        checkPolicy := func(expPolicy,
×
760
                migPolicy *models.ChannelEdgePolicy) error {
×
761

×
762
                switch {
×
763
                // Both policies are nil, nothing to compare.
764
                case expPolicy == nil && migPolicy == nil:
×
765
                        return nil
×
766

767
                // One of the policies is nil, but the other is not.
768
                case expPolicy == nil || migPolicy == nil:
×
769
                        return fmt.Errorf("expected both policies to be "+
×
770
                                "non-nil. Got expPolicy: %v, "+
×
771
                                "migPolicy: %v", expPolicy, migPolicy)
×
772

773
                // Both policies are non-nil, we can compare them.
774
                default:
×
775
                }
776

777
                if len(expPolicy.ExtraOpaqueData) == 0 {
×
778
                        expPolicy.ExtraOpaqueData = nil
×
779
                }
×
780
                if len(migPolicy.ExtraOpaqueData) == 0 {
×
781
                        migPolicy.ExtraOpaqueData = nil
×
782
                }
×
783

784
                return sqldb.CompareRecords(
×
785
                        *expPolicy, *migPolicy, "channel policy",
×
786
                )
×
787
        }
788

789
        err = checkPolicy(info.policy1, migPol1)
×
790
        if err != nil {
×
791
                return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
×
792
                        err)
×
793
        }
×
794

795
        err = checkPolicy(info.policy2, migPol2)
×
796
        if err != nil {
×
797
                return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
×
798
                        err)
×
799
        }
×
800

801
        return nil
×
802
}
803

804
// migratePruneLog migrates the prune log from the KV backend to the SQL
805
// database. It collects entries in batches, inserts them individually, and then
806
// validates them in batches using GetPruneEntriesForHeights for better i
807
// performance.
808
func migratePruneLog(ctx context.Context, cfg *sqldb.QueryConfig,
809
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
810

×
811
        var (
×
812
                totalTime = time.Now()
×
813

×
814
                count          uint64
×
815
                pruneTipHeight uint32
×
816
                pruneTipHash   chainhash.Hash
×
817

×
818
                t0    = time.Now()
×
819
                chunk uint64
×
820
                s     = rate.Sometimes{
×
821
                        Interval: 10 * time.Second,
×
822
                }
×
823
        )
×
824

×
825
        batch := make(map[uint32]chainhash.Hash, cfg.MaxBatchSize)
×
826

×
827
        // validateBatch validates a batch of prune entries using batch query.
×
828
        validateBatch := func() error {
×
829
                if len(batch) == 0 {
×
830
                        return nil
×
831
                }
×
832

833
                // Extract heights for the batch query.
834
                heights := make([]int64, 0, len(batch))
×
835
                for height := range batch {
×
836
                        heights = append(heights, int64(height))
×
837
                }
×
838

839
                // Batch fetch all entries from the database.
840
                rows, err := sqlDB.GetPruneEntriesForHeights(ctx, heights)
×
841
                if err != nil {
×
842
                        return fmt.Errorf("could not batch get prune "+
×
843
                                "entries: %w", err)
×
844
                }
×
845

846
                if len(rows) != len(batch) {
×
847
                        return fmt.Errorf("expected to fetch %d prune "+
×
848
                                "entries, but got %d", len(batch),
×
849
                                len(rows))
×
850
                }
×
851

852
                // Validate each entry in the batch.
853
                for _, row := range rows {
×
854
                        kvdbHash, ok := batch[uint32(row.BlockHeight)]
×
855
                        if !ok {
×
856
                                return fmt.Errorf("prune entry for height %d "+
×
857
                                        "not found in batch", row.BlockHeight)
×
858
                        }
×
859

860
                        err := sqldb.CompareRecords(
×
861
                                kvdbHash[:], row.BlockHash,
×
862
                                fmt.Sprintf("prune log entry at height %d",
×
863
                                        row.BlockHash),
×
864
                        )
×
865
                        if err != nil {
×
866
                                return err
×
867
                        }
×
868
                }
869

870
                // Reset the batch map for the next iteration.
871
                batch = make(map[uint32]chainhash.Hash, cfg.MaxBatchSize)
×
872

×
873
                return nil
×
874
        }
875

876
        // Iterate over each prune log entry in the KV store and migrate it to
877
        // the SQL database.
878
        err := forEachPruneLogEntry(
×
879
                kvBackend, func(height uint32, hash *chainhash.Hash) error {
×
880
                        count++
×
881
                        chunk++
×
882

×
883
                        // Keep track of the prune tip height and hash.
×
884
                        if height > pruneTipHeight {
×
885
                                pruneTipHeight = height
×
886
                                pruneTipHash = *hash
×
887
                        }
×
888

889
                        // Insert the entry (individual inserts for now).
890
                        err := sqlDB.UpsertPruneLogEntry(
×
891
                                ctx, sqlc.UpsertPruneLogEntryParams{
×
892
                                        BlockHeight: int64(height),
×
893
                                        BlockHash:   hash[:],
×
894
                                },
×
895
                        )
×
896
                        if err != nil {
×
897
                                return fmt.Errorf("unable to insert prune log "+
×
898
                                        "entry for height %d: %w", height, err)
×
899
                        }
×
900

901
                        // Add to validation batch.
902
                        batch[height] = *hash
×
903

×
904
                        // Validate batch when full.
×
905
                        if len(batch) >= int(cfg.MaxBatchSize) {
×
906
                                err := validateBatch()
×
907
                                if err != nil {
×
908
                                        return fmt.Errorf("batch "+
×
909
                                                "validation failed: %w", err)
×
910
                                }
×
911
                        }
912

913
                        s.Do(func() {
×
914
                                elapsed := time.Since(t0).Seconds()
×
915
                                ratePerSec := float64(chunk) / elapsed
×
916
                                log.Debugf("Migrated %d prune log "+
×
917
                                        "entries (%.2f entries/sec)",
×
918
                                        count, ratePerSec)
×
919

×
920
                                t0 = time.Now()
×
921
                                chunk = 0
×
922
                        })
×
923

924
                        return nil
×
925
                },
926
        )
927
        if err != nil {
×
928
                return fmt.Errorf("could not migrate prune log: %w", err)
×
929
        }
×
930

931
        // Validate any remaining entries in the batch.
932
        if len(batch) > 0 {
×
933
                err := validateBatch()
×
934
                if err != nil {
×
935
                        return fmt.Errorf("final batch validation failed: %w",
×
936
                                err)
×
937
                }
×
938
        }
939

940
        // Check that the prune tip is set correctly in the SQL
941
        // database.
942
        pruneTip, err := sqlDB.GetPruneTip(ctx)
×
943
        if errors.Is(err, sql.ErrNoRows) {
×
944
                // The ErrGraphNeverPruned error is expected if no prune log
×
945
                // entries were migrated from the kvdb store. Otherwise, it's
×
946
                // an unexpected error.
×
947
                if count == 0 {
×
948
                        log.Infof("No prune log entries found in KV store " +
×
949
                                "to migrate")
×
950
                        return nil
×
951
                }
×
952
                // Fall-through to the next error check.
953
        }
954
        if err != nil {
×
955
                return fmt.Errorf("could not get prune tip: %w", err)
×
956
        }
×
957

958
        if pruneTip.BlockHeight != int64(pruneTipHeight) ||
×
959
                !bytes.Equal(pruneTip.BlockHash, pruneTipHash[:]) {
×
960

×
961
                return fmt.Errorf("prune tip mismatch after migration: "+
×
962
                        "expected height %d, hash %s; got height %d, "+
×
963
                        "hash %s", pruneTipHeight, pruneTipHash,
×
964
                        pruneTip.BlockHeight,
×
965
                        chainhash.Hash(pruneTip.BlockHash))
×
966
        }
×
967

968
        log.Infof("Migrated %d prune log entries from KV to SQL in %s. "+
×
969
                "The prune tip is: height %d, hash: %s", count,
×
970
                time.Since(totalTime), pruneTipHeight, pruneTipHash)
×
971

×
972
        return nil
×
973
}
974

975
// forEachPruneLogEntry iterates over each prune log entry in the KV
976
// backend and calls the provided callback function for each entry.
977
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
978
        hash *chainhash.Hash) error) error {
×
979

×
980
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
981
                metaBucket := tx.ReadBucket(graphMetaBucket)
×
982
                if metaBucket == nil {
×
983
                        return ErrGraphNotFound
×
984
                }
×
985

986
                pruneBucket := metaBucket.NestedReadBucket(pruneLogBucket)
×
987
                if pruneBucket == nil {
×
988
                        // The graph has never been pruned and so, there are no
×
989
                        // entries to iterate over.
×
990
                        return nil
×
991
                }
×
992

993
                return pruneBucket.ForEach(func(k, v []byte) error {
×
994
                        blockHeight := byteOrder.Uint32(k)
×
995
                        var blockHash chainhash.Hash
×
996
                        copy(blockHash[:], v)
×
997

×
998
                        return cb(blockHeight, &blockHash)
×
999
                })
×
1000
        }, func() {})
×
1001
}
1002

1003
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
1004
// the SQL database. It collects SCIDs in batches, inserts them individually,
1005
// and then validates them in batches using GetClosedChannelsSCIDs for better
1006
// performance.
1007
func migrateClosedSCIDIndex(ctx context.Context, cfg *sqldb.QueryConfig,
1008
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
1009

×
1010
        var (
×
1011
                totalTime = time.Now()
×
1012

×
1013
                count uint64
×
1014

×
1015
                t0    = time.Now()
×
1016
                chunk uint64
×
1017
                s     = rate.Sometimes{
×
1018
                        Interval: 10 * time.Second,
×
1019
                }
×
1020
        )
×
1021

×
1022
        batch := make([][]byte, 0, cfg.MaxBatchSize)
×
1023

×
1024
        // validateBatch validates a batch of closed SCIDs using batch query.
×
1025
        validateBatch := func() error {
×
1026
                if len(batch) == 0 {
×
1027
                        return nil
×
1028
                }
×
1029

1030
                // Batch fetch all closed SCIDs from the database.
1031
                dbSCIDs, err := sqlDB.GetClosedChannelsSCIDs(ctx, batch)
×
1032
                if err != nil {
×
1033
                        return fmt.Errorf("could not batch get closed "+
×
1034
                                "SCIDs: %w", err)
×
1035
                }
×
1036

1037
                // Create set of SCIDs that exist in the database for quick
1038
                // lookup.
1039
                dbSCIDSet := make(map[string]struct{})
×
1040
                for _, scid := range dbSCIDs {
×
1041
                        dbSCIDSet[string(scid)] = struct{}{}
×
1042
                }
×
1043

1044
                // Validate each SCID in the batch.
1045
                for _, expectedSCID := range batch {
×
1046
                        if _, found := dbSCIDSet[string(expectedSCID)]; !found {
×
1047
                                return fmt.Errorf("closed SCID %x not found "+
×
1048
                                        "in database", expectedSCID)
×
1049
                        }
×
1050
                }
1051

1052
                // Reset the batch for the next iteration.
1053
                batch = make([][]byte, 0, cfg.MaxBatchSize)
×
1054

×
1055
                return nil
×
1056
        }
1057

1058
        migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
×
1059
                count++
×
1060
                chunk++
×
1061

×
1062
                chanIDB := channelIDToBytes(scid.ToUint64())
×
1063
                err := sqlDB.InsertClosedChannel(ctx, chanIDB)
×
1064
                if err != nil {
×
1065
                        return fmt.Errorf("could not insert closed channel "+
×
1066
                                "with SCID %s: %w", scid, err)
×
1067
                }
×
1068

1069
                // Add to validation batch.
1070
                batch = append(batch, chanIDB)
×
1071

×
1072
                // Validate batch when full.
×
1073
                if len(batch) >= int(cfg.MaxBatchSize) {
×
1074
                        err := validateBatch()
×
1075
                        if err != nil {
×
1076
                                return fmt.Errorf("batch validation failed: %w",
×
1077
                                        err)
×
1078
                        }
×
1079
                }
1080

1081
                s.Do(func() {
×
1082
                        elapsed := time.Since(t0).Seconds()
×
1083
                        ratePerSec := float64(chunk) / elapsed
×
1084
                        log.Debugf("Migrated %d closed scids "+
×
1085
                                "(%.2f entries/sec)", count, ratePerSec)
×
1086

×
1087
                        t0 = time.Now()
×
1088
                        chunk = 0
×
1089
                })
×
1090

1091
                return nil
×
1092
        }
1093

1094
        err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID)
×
1095
        if err != nil {
×
1096
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
1097
                        err)
×
1098
        }
×
1099

1100
        // Validate any remaining SCIDs in the batch.
1101
        if len(batch) > 0 {
×
1102
                err := validateBatch()
×
1103
                if err != nil {
×
1104
                        return fmt.Errorf("final batch validation failed: %w",
×
1105
                                err)
×
1106
                }
×
1107
        }
1108

1109
        log.Infof("Migrated %d closed SCIDs from KV to SQL in %s", count,
×
1110
                time.Since(totalTime))
×
1111

×
1112
        return nil
×
1113
}
1114

1115
// migrateZombieIndex migrates the zombie index from the KV backend to the SQL
1116
// database. It collects zombie channels in batches, inserts them individually,
1117
// and validates them in batches.
1118
//
1119
// NOTE: before inserting an entry into the zombie index, the function checks
1120
// if the channel is already marked as closed in the SQL store. If it is,
1121
// the entry is skipped. This means that the resulting zombie index count in
1122
// the SQL store may well be less than the count of zombie channels in the KV
1123
// store.
1124
func migrateZombieIndex(ctx context.Context, cfg *sqldb.QueryConfig,
1125
        kvBackend kvdb.Backend, sqlDB SQLQueries) error {
×
1126

×
1127
        var (
×
1128
                totalTime = time.Now()
×
1129

×
1130
                count uint64
×
1131

×
1132
                t0    = time.Now()
×
1133
                chunk uint64
×
1134
                s     = rate.Sometimes{
×
1135
                        Interval: 10 * time.Second,
×
1136
                }
×
1137
        )
×
1138

×
1139
        type zombieEntry struct {
×
1140
                pub1 route.Vertex
×
1141
                pub2 route.Vertex
×
1142
        }
×
1143

×
1144
        batch := make(map[uint64]*zombieEntry, cfg.MaxBatchSize)
×
1145

×
1146
        // validateBatch validates a batch of zombie SCIDs using batch query.
×
1147
        validateBatch := func() error {
×
1148
                if len(batch) == 0 {
×
1149
                        return nil
×
1150
                }
×
1151

1152
                scids := make([][]byte, 0, len(batch))
×
1153
                for scid := range batch {
×
1154
                        scids = append(scids, channelIDToBytes(scid))
×
1155
                }
×
1156

1157
                // Batch fetch all zombie channels from the database.
1158
                rows, err := sqlDB.GetZombieChannelsSCIDs(
×
1159
                        ctx, sqlc.GetZombieChannelsSCIDsParams{
×
1160
                                Version: int16(ProtocolV1),
×
1161
                                Scids:   scids,
×
1162
                        },
×
1163
                )
×
1164
                if err != nil {
×
1165
                        return fmt.Errorf("could not batch get zombie "+
×
1166
                                "SCIDs: %w", err)
×
1167
                }
×
1168

1169
                // Make sure that the number of rows returned matches
1170
                // the number of SCIDs we requested.
1171
                if len(rows) != len(scids) {
×
1172
                        return fmt.Errorf("expected to fetch %d zombie "+
×
1173
                                "SCIDs, but got %d", len(scids), len(rows))
×
1174
                }
×
1175

1176
                // Validate each row is in the batch.
1177
                for _, row := range rows {
×
1178
                        scid := byteOrder.Uint64(row.Scid)
×
1179

×
1180
                        kvdbZombie, ok := batch[scid]
×
1181
                        if !ok {
×
1182
                                return fmt.Errorf("zombie SCID %x not found "+
×
1183
                                        "in batch", scid)
×
1184
                        }
×
1185

1186
                        err = sqldb.CompareRecords(
×
1187
                                kvdbZombie.pub1[:], row.NodeKey1,
×
1188
                                fmt.Sprintf("zombie pub key 1 (%s) for "+
×
1189
                                        "channel %d", kvdbZombie.pub1, scid),
×
1190
                        )
×
1191
                        if err != nil {
×
1192
                                return err
×
1193
                        }
×
1194

1195
                        err = sqldb.CompareRecords(
×
1196
                                kvdbZombie.pub2[:], row.NodeKey2,
×
1197
                                fmt.Sprintf("zombie pub key 2 (%s) for "+
×
1198
                                        "channel %d", kvdbZombie.pub2, scid),
×
1199
                        )
×
1200
                        if err != nil {
×
1201
                                return err
×
1202
                        }
×
1203
                }
1204

1205
                // Reset the batch for the next iteration.
1206
                batch = make(map[uint64]*zombieEntry, cfg.MaxBatchSize)
×
1207

×
1208
                return nil
×
1209
        }
1210

1211
        err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
×
1212
                pubKey2 [33]byte) error {
×
1213

×
1214
                chanIDB := channelIDToBytes(chanID)
×
1215

×
1216
                // If it is in the closed SCID index, we don't need to
×
1217
                // add it to the zombie index.
×
1218
                //
×
1219
                // NOTE: this means that the resulting zombie index count in
×
1220
                // the SQL store may well be less than the count of zombie
×
1221
                // channels in the KV store.
×
1222
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
×
1223
                if err != nil {
×
1224
                        return fmt.Errorf("could not check closed "+
×
1225
                                "channel: %w", err)
×
1226
                }
×
1227
                if isClosed {
×
1228
                        return nil
×
1229
                }
×
1230

1231
                count++
×
1232
                chunk++
×
1233

×
1234
                err = sqlDB.UpsertZombieChannel(
×
1235
                        ctx, sqlc.UpsertZombieChannelParams{
×
1236
                                Version:  int16(ProtocolV1),
×
1237
                                Scid:     chanIDB,
×
1238
                                NodeKey1: pubKey1[:],
×
1239
                                NodeKey2: pubKey2[:],
×
1240
                        },
×
1241
                )
×
1242
                if err != nil {
×
1243
                        return fmt.Errorf("could not upsert zombie "+
×
1244
                                "channel %d: %w", chanID, err)
×
1245
                }
×
1246

1247
                // Add to validation batch only after successful insertion.
1248
                batch[chanID] = &zombieEntry{
×
1249
                        pub1: pubKey1,
×
1250
                        pub2: pubKey2,
×
1251
                }
×
1252

×
1253
                // Validate batch when full.
×
1254
                if len(batch) >= int(cfg.MaxBatchSize) {
×
1255
                        err := validateBatch()
×
1256
                        if err != nil {
×
1257
                                return fmt.Errorf("batch validation failed: %w",
×
1258
                                        err)
×
1259
                        }
×
1260
                }
1261

1262
                s.Do(func() {
×
1263
                        elapsed := time.Since(t0).Seconds()
×
1264
                        ratePerSec := float64(chunk) / elapsed
×
1265
                        log.Debugf("Migrated %d zombie index entries "+
×
1266
                                "(%.2f entries/sec)", count, ratePerSec)
×
1267

×
1268
                        t0 = time.Now()
×
1269
                        chunk = 0
×
1270
                })
×
1271

1272
                return nil
×
1273
        })
1274
        if err != nil {
×
1275
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
1276
        }
×
1277

1278
        // Validate any remaining zombie SCIDs in the batch.
1279
        if len(batch) > 0 {
×
1280
                err := validateBatch()
×
1281
                if err != nil {
×
1282
                        return fmt.Errorf("final batch validation failed: %w",
×
1283
                                err)
×
1284
                }
×
1285
        }
1286

1287
        log.Infof("Migrated %d zombie channels from KV to SQL in %s", count,
×
1288
                time.Since(totalTime))
×
1289

×
1290
        return nil
×
1291
}
1292

1293
// forEachZombieEntry iterates over each zombie channel entry in the
1294
// KV backend and calls the provided callback function for each entry.
1295
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
1296
        pubKey2 [33]byte) error) error {
×
1297

×
1298
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
1299
                edges := tx.ReadBucket(edgeBucket)
×
1300
                if edges == nil {
×
1301
                        return ErrGraphNoEdgesFound
×
1302
                }
×
1303
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
1304
                if zombieIndex == nil {
×
1305
                        return nil
×
1306
                }
×
1307

1308
                return zombieIndex.ForEach(func(k, v []byte) error {
×
1309
                        var pubKey1, pubKey2 [33]byte
×
1310
                        copy(pubKey1[:], v[:33])
×
1311
                        copy(pubKey2[:], v[33:])
×
1312

×
1313
                        return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
×
1314
                })
×
1315
        }, func() {})
×
1316
}
1317

1318
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
1319
// the provided callback function for each SCID.
1320
func forEachClosedSCID(db kvdb.Backend,
1321
        cb func(lnwire.ShortChannelID) error) error {
×
1322

×
1323
        return kvdb.View(db, func(tx kvdb.RTx) error {
×
1324
                closedScids := tx.ReadBucket(closedScidBucket)
×
1325
                if closedScids == nil {
×
1326
                        return nil
×
1327
                }
×
1328

1329
                return closedScids.ForEach(func(k, _ []byte) error {
×
1330
                        return cb(lnwire.NewShortChanIDFromInt(
×
1331
                                byteOrder.Uint64(k),
×
1332
                        ))
×
1333
                })
×
1334
        }, func() {})
×
1335
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc