• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16651762882

31 Jul 2025 02:26PM UTC coverage: 67.045% (-0.002%) from 67.047%
16651762882

Pull #9993

github

web-flow
Merge d00c6f917 into 37523b6cb
Pull Request #9993: Validate UTF-8 description and empty route hints when parsing BOLT-11 invoices

7 of 7 new or added lines in 1 file covered. (100.0%)

82 existing lines in 17 files now uncovered.

135566 of 202201 relevant lines covered (67.05%)

21685.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.84
/channeldb/migration30/migration.go
1
package migration30
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7
        "math"
8
        "sync"
9

10
        mig24 "github.com/lightningnetwork/lnd/channeldb/migration24"
11
        mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
12
        mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
)
15

16
// recordsPerTx specifies the number of records to be migrated in each database
17
// transaction. In the worst case, each old revocation log is 28,057 bytes.
18
// 20,000 records would consume 0.56 GB of ram, which is feasible for a modern
19
// machine.
20
//
21
// NOTE: we could've used more ram but it doesn't help with the speed of the
22
// migration since the most of the CPU time is used for calculating the output
23
// indexes.
24
const recordsPerTx = 20_000
25

26
// MigrateRevLogConfig is an interface that defines the config that should be
27
// passed to the MigrateRevocationLog function.
28
type MigrateRevLogConfig interface {
29
        // GetNoAmountData returns true if the amount data of revoked commitment
30
        // transactions should not be stored in the revocation log.
31
        GetNoAmountData() bool
32
}
33

34
// MigrateRevLogConfigImpl implements the MigrationRevLogConfig interface.
35
type MigrateRevLogConfigImpl struct {
36
        // NoAmountData if set to true will result in the amount data of revoked
37
        // commitment transactions not being stored in the revocation log.
38
        NoAmountData bool
39
}
40

41
// GetNoAmountData returns true if the amount data of revoked commitment
42
// transactions should not be stored in the revocation log.
43
func (c *MigrateRevLogConfigImpl) GetNoAmountData() bool {
2,826✔
44
        return c.NoAmountData
2,826✔
45
}
2,826✔
46

47
// MigrateRevocationLog migrates the old revocation logs into the newer format
48
// and deletes them once finished, with the deletion only happens once ALL the
49
// old logs have been migrates.
50
func MigrateRevocationLog(db kvdb.Backend, cfg MigrateRevLogConfig) error {
900✔
51
        log.Infof("Migrating revocation logs, might take a while...")
900✔
52

900✔
53
        var (
900✔
54
                err error
900✔
55

900✔
56
                // finished is used to exit the for loop.
900✔
57
                finished bool
900✔
58

900✔
59
                // total is the number of total records.
900✔
60
                total uint64
900✔
61

900✔
62
                // migrated is the number of already migrated records.
900✔
63
                migrated uint64
900✔
64
        )
900✔
65

900✔
66
        // First of all, read the stats of the revocation logs.
900✔
67
        total, migrated, err = logMigrationStat(db)
900✔
68
        if err != nil {
900✔
69
                return err
×
70
        }
×
71
        log.Infof("Total logs=%d, migrated=%d", total, migrated)
900✔
72

900✔
73
        // Exit early if the old logs have already been migrated and deleted.
900✔
74
        if total == 0 {
936✔
75
                log.Info("Migration already finished!")
36✔
76
                return nil
36✔
77
        }
36✔
78

79
        for {
3,348✔
80
                if finished {
3,348✔
81
                        log.Infof("Migrating old revocation logs finished, " +
864✔
82
                                "now checking the migration results...")
864✔
83
                        break
864✔
84
                }
85

86
                // Process the migration.
87
                err = kvdb.Update(db, func(tx kvdb.RwTx) error {
3,240✔
88
                        finished, err = processMigration(tx, cfg)
1,620✔
89
                        if err != nil {
1,620✔
90
                                return err
×
91
                        }
×
92
                        return nil
1,620✔
93
                }, func() {})
1,620✔
94
                if err != nil {
1,620✔
95
                        return err
×
96
                }
×
97

98
                // Each time we finished the above process, we'd read the stats
99
                // again to understand the current progress.
100
                total, migrated, err = logMigrationStat(db)
1,620✔
101
                if err != nil {
1,620✔
102
                        return err
×
103
                }
×
104

105
                // Calculate and log the progress if the progress is less than
106
                // one.
107
                progress := float64(migrated) / float64(total) * 100
1,620✔
108
                if progress >= 100 {
3,240✔
109
                        continue
1,620✔
110
                }
111

112
                log.Infof("Migration progress: %.3f%%, still have: %d",
×
113
                        progress, total-migrated)
×
114
        }
115

116
        // Before we can safety delete the old buckets, we perform a check to
117
        // make sure the logs are migrated as expected.
118
        err = kvdb.Update(db, validateMigration, func() {})
1,728✔
119
        if err != nil {
864✔
120
                return fmt.Errorf("validate migration failed: %w", err)
×
121
        }
×
122

123
        log.Info("Migration check passed, now deleting the old logs...")
864✔
124

864✔
125
        // Once the migration completes, we can now safety delete the old
864✔
126
        // revocation logs.
864✔
127
        if err := deleteOldBuckets(db); err != nil {
864✔
128
                return fmt.Errorf("deleteOldBuckets err: %w", err)
×
129
        }
×
130

131
        log.Info("Old revocation log buckets removed!")
864✔
132
        return nil
864✔
133
}
134

135
// processMigration finds the next un-migrated revocation logs, reads a max
136
// number of `recordsPerTx` records, converts them into the new revocation logs
137
// and save them to disk.
138
func processMigration(tx kvdb.RwTx, cfg MigrateRevLogConfig) (bool, error) {
1,620✔
139
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
1,620✔
140

1,620✔
141
        // If no bucket is found, we can exit early.
1,620✔
142
        if openChanBucket == nil {
1,620✔
143
                return false, fmt.Errorf("root bucket not found")
×
144
        }
×
145

146
        // Locate the next migration height.
147
        locator, err := locateNextUpdateNum(openChanBucket)
1,620✔
148
        if err != nil {
1,620✔
149
                return false, fmt.Errorf("locator got error: %w", err)
×
150
        }
×
151

152
        // If the returned locator is nil, we've done migrating the logs.
153
        if locator == nil {
2,484✔
154
                return true, nil
864✔
155
        }
864✔
156

157
        // Read a list of old revocation logs.
158
        entryMap, err := readOldRevocationLogs(openChanBucket, locator, cfg)
756✔
159
        if err != nil {
756✔
160
                return false, fmt.Errorf("read old logs err: %w", err)
×
161
        }
×
162

163
        // Migrate the revocation logs.
164
        return false, writeRevocationLogs(openChanBucket, entryMap)
756✔
165
}
166

167
// deleteOldBuckets iterates all the channel buckets and deletes the old
168
// revocation buckets.
169
func deleteOldBuckets(db kvdb.Backend) error {
864✔
170
        // locators records all the chan buckets found in the database.
864✔
171
        var locators []*updateLocator
864✔
172

864✔
173
        // reader is a helper closure that saves the locator found. Each
864✔
174
        // locator is relatively small(33+32+36+8=109 bytes), assuming 1 GB of
864✔
175
        // ram we can fit roughly 10 million records. Since each record
864✔
176
        // corresponds to a channel, we should have more than enough memory to
864✔
177
        // read them all.
864✔
178
        reader := func(_ kvdb.RwBucket, l *updateLocator) error { // nolint:unparam
3,444✔
179
                locators = append(locators, l)
2,580✔
180
                return nil
2,580✔
181
        }
2,580✔
182

183
        // remover is a helper closure that removes the old revocation log
184
        // bucket under the specified chan bucket by the given locator.
185
        remover := func(rootBucket kvdb.RwBucket, l *updateLocator) error {
3,444✔
186
                chanBucket, err := l.locateChanBucket(rootBucket)
2,580✔
187
                if err != nil {
2,580✔
188
                        return err
×
189
                }
×
190

191
                return chanBucket.DeleteNestedBucket(
2,580✔
192
                        revocationLogBucketDeprecated,
2,580✔
193
                )
2,580✔
194
        }
195

196
        // Perform the deletion in one db transaction. This should not cause
197
        // any memory issue as the deletion doesn't load any data from the
198
        // buckets.
199
        return kvdb.Update(db, func(tx kvdb.RwTx) error {
1,728✔
200
                openChanBucket := tx.ReadWriteBucket(openChannelBucket)
864✔
201

864✔
202
                // Exit early if there's no bucket.
864✔
203
                if openChanBucket == nil {
864✔
204
                        return nil
×
205
                }
×
206

207
                // Iterate the buckets to find all the locators.
208
                err := iterateBuckets(openChanBucket, nil, reader)
864✔
209
                if err != nil {
864✔
210
                        return err
×
211
                }
×
212

213
                // Iterate the locators and delete all the old revocation log
214
                // buckets.
215
                for _, l := range locators {
3,444✔
216
                        err := remover(openChanBucket, l)
2,580✔
217
                        // If the bucket doesn't exist, we can exit safety.
2,580✔
218
                        if err != nil && err != kvdb.ErrBucketNotFound {
2,580✔
219
                                return err
×
220
                        }
×
221
                }
222

223
                return nil
864✔
224
        }, func() {})
864✔
225
}
226

227
// writeRevocationLogs unwraps the entryMap and writes the new revocation logs.
228
func writeRevocationLogs(openChanBucket kvdb.RwBucket,
229
        entryMap logEntries) error {
756✔
230

756✔
231
        for locator, logs := range entryMap {
2,358✔
232
                // Find the channel bucket.
1,602✔
233
                chanBucket, err := locator.locateChanBucket(openChanBucket)
1,602✔
234
                if err != nil {
1,602✔
235
                        return fmt.Errorf("locateChanBucket err: %w", err)
×
236
                }
×
237

238
                // Create the new log bucket.
239
                logBucket, err := chanBucket.CreateBucketIfNotExists(
1,602✔
240
                        revocationLogBucket,
1,602✔
241
                )
1,602✔
242
                if err != nil {
1,602✔
243
                        return fmt.Errorf("create log bucket err: %w", err)
×
244
                }
×
245

246
                // Write the new logs.
247
                for _, entry := range logs {
4,428✔
248
                        var b bytes.Buffer
2,826✔
249
                        err := serializeRevocationLog(&b, entry.log)
2,826✔
250
                        if err != nil {
2,826✔
251
                                return err
×
252
                        }
×
253

254
                        logEntrykey := mig24.MakeLogKey(entry.commitHeight)
2,826✔
255
                        err = logBucket.Put(logEntrykey[:], b.Bytes())
2,826✔
256
                        if err != nil {
2,826✔
257
                                return fmt.Errorf("putRevocationLog err: %w",
×
258
                                        err)
×
259
                        }
×
260
                }
261
        }
262

263
        return nil
756✔
264
}
265

266
// logMigrationStat reads the buckets to provide stats over current migration
267
// progress. The returned values are the numbers of total records and already
268
// migrated records.
269
func logMigrationStat(db kvdb.Backend) (uint64, uint64, error) {
2,520✔
270
        var (
2,520✔
271
                err error
2,520✔
272

2,520✔
273
                // total is the number of total records.
2,520✔
274
                total uint64
2,520✔
275

2,520✔
276
                // unmigrated is the number of unmigrated records.
2,520✔
277
                unmigrated uint64
2,520✔
278
        )
2,520✔
279

2,520✔
280
        err = kvdb.Update(db, func(tx kvdb.RwTx) error {
5,040✔
281
                total, unmigrated, err = fetchLogStats(tx)
2,520✔
282
                return err
2,520✔
283
        }, func() {})
5,040✔
284

285
        log.Debugf("Total logs=%d, unmigrated=%d", total, unmigrated)
2,520✔
286
        return total, total - unmigrated, err
2,520✔
287
}
288

289
// fetchLogStats iterates all the chan buckets to provide stats about the logs.
290
// The returned values are num of total records, and num of un-migrated
291
// records.
292
func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, error) {
2,520✔
293
        var (
2,520✔
294
                total           uint64
2,520✔
295
                totalUnmigrated uint64
2,520✔
296
        )
2,520✔
297

2,520✔
298
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
2,520✔
299

2,520✔
300
        // If no bucket is found, we can exit early.
2,520✔
301
        if openChanBucket == nil {
2,520✔
302
                return 0, 0, fmt.Errorf("root bucket not found")
×
303
        }
×
304

305
        // counter is a helper closure used to count the number of records
306
        // based on the given bucket.
307
        counter := func(chanBucket kvdb.RwBucket, bucket []byte) uint64 {
11,928✔
308
                // Read the sub-bucket level 4.
9,408✔
309
                logBucket := chanBucket.NestedReadBucket(bucket)
9,408✔
310

9,408✔
311
                // Exit early if we don't have the bucket.
9,408✔
312
                if logBucket == nil {
11,442✔
313
                        return 0
2,034✔
314
                }
2,034✔
315

316
                // Jump to the end of the cursor.
317
                key, _ := logBucket.ReadCursor().Last()
7,374✔
318

7,374✔
319
                // Since the CommitHeight is a zero-based monotonically
7,374✔
320
                // increased index, its value plus one reflects the total
7,374✔
321
                // records under this chan bucket.
7,374✔
322
                lastHeight := binary.BigEndian.Uint64(key) + 1
7,374✔
323

7,374✔
324
                return lastHeight
7,374✔
325
        }
326

327
        // countTotal is a callback function used to count the total number of
328
        // records.
329
        countTotal := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
10,044✔
330
                total += counter(chanBucket, revocationLogBucketDeprecated)
7,524✔
331
                return nil
7,524✔
332
        }
7,524✔
333

334
        // countUnmigrated is a callback function used to count the total
335
        // number of un-migrated records.
336
        countUnmigrated := func(chanBucket kvdb.RwBucket,
2,520✔
337
                l *updateLocator) error {
4,404✔
338

1,884✔
339
                totalUnmigrated += counter(
1,884✔
340
                        chanBucket, revocationLogBucketDeprecated,
1,884✔
341
                )
1,884✔
342
                return nil
1,884✔
343
        }
1,884✔
344

345
        // Locate the next migration height.
346
        locator, err := locateNextUpdateNum(openChanBucket)
2,520✔
347
        if err != nil {
2,520✔
348
                return 0, 0, fmt.Errorf("locator got error: %w", err)
×
349
        }
×
350

351
        // If the returned locator is not nil, we still have un-migrated
352
        // records so we need to count them. Otherwise we've done migrating the
353
        // logs.
354
        if locator != nil {
3,276✔
355
                err = iterateBuckets(openChanBucket, locator, countUnmigrated)
756✔
356
                if err != nil {
756✔
357
                        return 0, 0, err
×
358
                }
×
359
        }
360

361
        // Count the total number of records by supplying a nil locator.
362
        err = iterateBuckets(openChanBucket, nil, countTotal)
2,520✔
363
        if err != nil {
2,520✔
364
                return 0, 0, err
×
365
        }
×
366

367
        return total, totalUnmigrated, err
2,520✔
368
}
369

370
// logEntry houses the info needed to write a new revocation log.
371
type logEntry struct {
372
        log          *RevocationLog
373
        commitHeight uint64
374
        ourIndex     uint32
375
        theirIndex   uint32
376
        locator      *updateLocator
377
}
378

379
// logEntries maps a bucket locator to a list of entries under that bucket.
380
type logEntries map[*updateLocator][]*logEntry
381

382
// result is made of two channels that's used to send back the constructed new
383
// revocation log or an error.
384
type result struct {
385
        newLog  chan *logEntry
386
        errChan chan error
387
}
388

389
// readOldRevocationLogs finds a list of old revocation logs and converts them
390
// into the new revocation logs.
391
func readOldRevocationLogs(openChanBucket kvdb.RwBucket,
392
        locator *updateLocator, cfg MigrateRevLogConfig) (logEntries, error) {
756✔
393

756✔
394
        entries := make(logEntries)
756✔
395
        results := make([]*result, 0)
756✔
396

756✔
397
        var wg sync.WaitGroup
756✔
398

756✔
399
        // collectLogs is a helper closure that reads all newly created
756✔
400
        // revocation logs sent over the result channels.
756✔
401
        //
756✔
402
        // NOTE: the order of the logs cannot be guaranteed, which is fine as
756✔
403
        // boltdb will take care of the orders when saving them.
756✔
404
        collectLogs := func() error {
1,512✔
405
                wg.Wait()
756✔
406

756✔
407
                for _, r := range results {
3,582✔
408
                        select {
2,826✔
409
                        case entry := <-r.newLog:
2,826✔
410
                                entries[entry.locator] = append(
2,826✔
411
                                        entries[entry.locator], entry,
2,826✔
412
                                )
2,826✔
413

414
                        case err := <-r.errChan:
×
415
                                return err
×
416
                        }
417
                }
418

419
                return nil
756✔
420
        }
421

422
        // createLog is a helper closure that constructs a new revocation log.
423
        //
424
        // NOTE: used as a goroutine.
425
        createLog := func(chanState *mig26.OpenChannel,
756✔
426
                c mig.ChannelCommitment, l *updateLocator, r *result) {
3,582✔
427

2,826✔
428
                defer wg.Done()
2,826✔
429

2,826✔
430
                // Find the output indexes.
2,826✔
431
                ourIndex, theirIndex, err := findOutputIndexes(chanState, &c)
2,826✔
432
                if err != nil {
2,826✔
433
                        r.errChan <- err
×
434
                }
×
435

436
                // Convert the old logs into the new logs. We do this early in
437
                // the read tx so the old large revocation log can be set to
438
                // nil here so save us some memory space.
439
                newLog, err := convertRevocationLog(
2,826✔
440
                        &c, ourIndex, theirIndex, cfg.GetNoAmountData(),
2,826✔
441
                )
2,826✔
442
                if err != nil {
2,826✔
443
                        r.errChan <- err
×
444
                }
×
445
                // Create the entry that will be used to create the new log.
446
                entry := &logEntry{
2,826✔
447
                        log:          newLog,
2,826✔
448
                        commitHeight: c.CommitHeight,
2,826✔
449
                        ourIndex:     ourIndex,
2,826✔
450
                        theirIndex:   theirIndex,
2,826✔
451
                        locator:      l,
2,826✔
452
                }
2,826✔
453

2,826✔
454
                r.newLog <- entry
2,826✔
455
        }
456

457
        // innerCb is the stepping function used when iterating the old log
458
        // bucket.
459
        innerCb := func(chanState *mig26.OpenChannel, l *updateLocator,
756✔
460
                _, v []byte) error {
3,582✔
461

2,826✔
462
                reader := bytes.NewReader(v)
2,826✔
463
                c, err := mig.DeserializeChanCommit(reader)
2,826✔
464
                if err != nil {
2,826✔
465
                        return err
×
466
                }
×
467

468
                r := &result{
2,826✔
469
                        newLog:  make(chan *logEntry, 1),
2,826✔
470
                        errChan: make(chan error, 1),
2,826✔
471
                }
2,826✔
472
                results = append(results, r)
2,826✔
473

2,826✔
474
                // We perform the log creation in a goroutine as it takes some
2,826✔
475
                // time to compute and find output indexes.
2,826✔
476
                wg.Add(1)
2,826✔
477
                go createLog(chanState, c, l, r)
2,826✔
478

2,826✔
479
                // Check the records read so far and signals exit when we've
2,826✔
480
                // reached our memory cap.
2,826✔
481
                if len(results) >= recordsPerTx {
2,826✔
482
                        return errExit
×
483
                }
×
484

485
                return nil
2,826✔
486
        }
487

488
        // cb is the callback function to be used when iterating the buckets.
489
        cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
2,640✔
490
                // Read the open channel.
1,884✔
491
                c := &mig26.OpenChannel{}
1,884✔
492
                err := mig26.FetchChanInfo(chanBucket, c, false)
1,884✔
493
                if err != nil {
1,884✔
494
                        return fmt.Errorf("unable to fetch chan info: %w", err)
×
495
                }
×
496

497
                err = fetchChanRevocationState(chanBucket, c)
1,884✔
498
                if err != nil {
1,884✔
499
                        return fmt.Errorf("unable to fetch revocation "+
×
500
                                "state: %v", err)
×
501
                }
×
502

503
                // Read the sub-bucket level 4.
504
                logBucket := chanBucket.NestedReadBucket(
1,884✔
505
                        revocationLogBucketDeprecated,
1,884✔
506
                )
1,884✔
507
                // Exit early if we don't have the old bucket.
1,884✔
508
                if logBucket == nil {
2,166✔
509
                        return nil
282✔
510
                }
282✔
511

512
                // Init the map key when needed.
513
                _, ok := entries[l]
1,602✔
514
                if !ok {
3,204✔
515
                        entries[l] = make([]*logEntry, 0, recordsPerTx)
1,602✔
516
                }
1,602✔
517

518
                return iterator(
1,602✔
519
                        logBucket, locator.nextHeight,
1,602✔
520
                        func(k, v []byte) error {
4,428✔
521
                                // Reset the nextHeight for following chan
2,826✔
522
                                // buckets.
2,826✔
523
                                locator.nextHeight = nil
2,826✔
524
                                return innerCb(c, l, k, v)
2,826✔
525
                        },
2,826✔
526
                )
527
        }
528

529
        err := iterateBuckets(openChanBucket, locator, cb)
756✔
530
        // If there's an error and it's not exit signal, we won't collect the
756✔
531
        // logs from the result channels.
756✔
532
        if err != nil && err != errExit {
756✔
533
                return nil, err
×
534
        }
×
535

536
        // Otherwise, collect the logs.
537
        err = collectLogs()
756✔
538

756✔
539
        return entries, err
756✔
540
}
541

542
// convertRevocationLog uses the fields `CommitTx` and `Htlcs` from a
543
// ChannelCommitment to construct a revocation log entry.
544
func convertRevocationLog(commit *mig.ChannelCommitment,
545
        ourOutputIndex, theirOutputIndex uint32,
546
        noAmtData bool) (*RevocationLog, error) {
2,826✔
547

2,826✔
548
        // Sanity check that the output indexes can be safely converted.
2,826✔
549
        if ourOutputIndex > math.MaxUint16 {
2,826✔
550
                return nil, ErrOutputIndexTooBig
×
551
        }
×
552
        if theirOutputIndex > math.MaxUint16 {
2,826✔
553
                return nil, ErrOutputIndexTooBig
×
554
        }
×
555

556
        rl := &RevocationLog{
2,826✔
557
                OurOutputIndex:   uint16(ourOutputIndex),
2,826✔
558
                TheirOutputIndex: uint16(theirOutputIndex),
2,826✔
559
                CommitTxHash:     commit.CommitTx.TxHash(),
2,826✔
560
                HTLCEntries:      make([]*HTLCEntry, 0, len(commit.Htlcs)),
2,826✔
561
        }
2,826✔
562

2,826✔
563
        if !noAmtData {
2,826✔
UNCOV
564
                rl.TheirBalance = &commit.RemoteBalance
×
UNCOV
565
                rl.OurBalance = &commit.LocalBalance
×
UNCOV
566
        }
×
567

568
        for _, htlc := range commit.Htlcs {
4,428✔
569
                // Skip dust HTLCs.
1,602✔
570
                if htlc.OutputIndex < 0 {
1,602✔
571
                        continue
×
572
                }
573

574
                // Sanity check that the output indexes can be safely
575
                // converted.
576
                if htlc.OutputIndex > math.MaxUint16 {
1,602✔
577
                        return nil, ErrOutputIndexTooBig
×
578
                }
×
579

580
                entry := &HTLCEntry{
1,602✔
581
                        RHash:         htlc.RHash,
1,602✔
582
                        RefundTimeout: htlc.RefundTimeout,
1,602✔
583
                        Incoming:      htlc.Incoming,
1,602✔
584
                        OutputIndex:   uint16(htlc.OutputIndex),
1,602✔
585
                        Amt:           htlc.Amt.ToSatoshis(),
1,602✔
586
                }
1,602✔
587
                rl.HTLCEntries = append(rl.HTLCEntries, entry)
1,602✔
588
        }
589

590
        return rl, nil
2,826✔
591
}
592

593
// validateMigration checks that the data saved in the new buckets match those
594
// saved in the old buckets. It does so by checking the last keys saved in both
595
// buckets can match, given the assumption that the `CommitHeight` is
596
// monotonically increased value so the last key represents the total number of
597
// records saved.
598
func validateMigration(tx kvdb.RwTx) error {
868✔
599
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
868✔
600

868✔
601
        // If no bucket is found, we can exit early.
868✔
602
        if openChanBucket == nil {
868✔
603
                return nil
×
604
        }
×
605

606
        // exitWithErr is a helper closure that prepends an error message with
607
        // the locator info.
608
        exitWithErr := func(l *updateLocator, msg string) error {
870✔
609
                return fmt.Errorf("unmatched records found under <nodePub=%x"+
2✔
610
                        ", chainHash=%x, fundingOutpoint=%x>: %v", l.nodePub,
2✔
611
                        l.chainHash, l.fundingOutpoint, msg)
2✔
612
        }
2✔
613

614
        // cb is the callback function to be used when iterating the buckets.
615
        cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
3,452✔
616
                // Read both the old and new revocation log buckets.
2,584✔
617
                oldBucket := chanBucket.NestedReadBucket(
2,584✔
618
                        revocationLogBucketDeprecated,
2,584✔
619
                )
2,584✔
620
                newBucket := chanBucket.NestedReadBucket(revocationLogBucket)
2,584✔
621

2,584✔
622
                // Exit early if the old bucket is nil.
2,584✔
623
                //
2,584✔
624
                // NOTE: the new bucket may not be nil here as new logs might
2,584✔
625
                // have been created using lnd@v0.15.0.
2,584✔
626
                if oldBucket == nil {
3,184✔
627
                        return nil
600✔
628
                }
600✔
629

630
                // Return an error if the expected new bucket cannot be found.
631
                if newBucket == nil {
1,984✔
632
                        return exitWithErr(l, "expected new bucket")
×
633
                }
×
634

635
                // Acquire the cursors.
636
                oldCursor := oldBucket.ReadCursor()
1,984✔
637
                newCursor := newBucket.ReadCursor()
1,984✔
638

1,984✔
639
                // Jump to the end of the cursors to do a quick check.
1,984✔
640
                newKey, _ := oldCursor.Last()
1,984✔
641
                oldKey, _ := newCursor.Last()
1,984✔
642

1,984✔
643
                // We expected the CommitHeights to be matched for nodes prior
1,984✔
644
                // to v0.15.0.
1,984✔
645
                if bytes.Equal(newKey, oldKey) {
3,965✔
646
                        return nil
1,981✔
647
                }
1,981✔
648

649
                // If the keys do not match, it's likely the node is running
650
                // v0.15.0 and have new logs created. In this case, we will
651
                // validate that every record in the old bucket can be found in
652
                // the new bucket.
653
                oldKey, _ = oldCursor.First()
3✔
654

3✔
655
                for {
9✔
656
                        // Try to locate the old key in the new bucket and we
6✔
657
                        // expect it to be found.
6✔
658
                        newKey, _ := newCursor.Seek(oldKey)
6✔
659

6✔
660
                        // If the old key is not found in the new bucket,
6✔
661
                        // return an error.
6✔
662
                        //
6✔
663
                        // NOTE: because Seek will return the next key when the
6✔
664
                        // passed key cannot be found, we need to compare the
6✔
665
                        // keys to deicde whether the old key is found or not.
6✔
666
                        if !bytes.Equal(newKey, oldKey) {
8✔
667
                                errMsg := fmt.Sprintf("old bucket has "+
2✔
668
                                        "CommitHeight=%v cannot be found in "+
2✔
669
                                        "new bucket", oldKey)
2✔
670
                                return exitWithErr(l, errMsg)
2✔
671
                        }
2✔
672

673
                        // Otherwise, keep iterating the old bucket.
674
                        oldKey, _ = oldCursor.Next()
4✔
675

4✔
676
                        // If we've done iterating, all keys have been matched
4✔
677
                        // and we can safely exit.
4✔
678
                        if oldKey == nil {
5✔
679
                                return nil
1✔
680
                        }
1✔
681
                }
682
        }
683

684
        return iterateBuckets(openChanBucket, nil, cb)
868✔
685
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc