• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15031268339

14 May 2025 09:15PM UTC coverage: 58.592% (-10.4%) from 68.997%
15031268339

Pull #9801

github

web-flow
Merge 748c3fe22 into b0cba7dd0
Pull Request #9801: peer+lnd: add new CLI option to control if we D/C on slow pongs

5 of 79 new or added lines in 3 files covered. (6.33%)

28199 existing lines in 450 files now uncovered.

97428 of 166282 relevant lines covered (58.59%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/channeldb/migration30/migration.go
1
package migration30
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7
        "math"
8
        "sync"
9

10
        mig24 "github.com/lightningnetwork/lnd/channeldb/migration24"
11
        mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
12
        mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
)
15

16
// recordsPerTx specifies the number of records to be migrated in each database
17
// transaction. In the worst case, each old revocation log is 28,057 bytes.
18
// 20,000 records would consume 0.56 GB of ram, which is feasible for a modern
19
// machine.
20
//
21
// NOTE: we could've used more ram but it doesn't help with the speed of the
22
// migration since the most of the CPU time is used for calculating the output
23
// indexes.
24
const recordsPerTx = 20_000
25

26
// MigrateRevLogConfig is an interface that defines the config that should be
27
// passed to the MigrateRevocationLog function.
28
type MigrateRevLogConfig interface {
29
        // GetNoAmountData returns true if the amount data of revoked commitment
30
        // transactions should not be stored in the revocation log.
31
        GetNoAmountData() bool
32
}
33

34
// MigrateRevLogConfigImpl implements the MigrationRevLogConfig interface.
35
type MigrateRevLogConfigImpl struct {
36
        // NoAmountData if set to true will result in the amount data of revoked
37
        // commitment transactions not being stored in the revocation log.
38
        NoAmountData bool
39
}
40

41
// GetNoAmountData returns true if the amount data of revoked commitment
42
// transactions should not be stored in the revocation log.
UNCOV
43
func (c *MigrateRevLogConfigImpl) GetNoAmountData() bool {
×
UNCOV
44
        return c.NoAmountData
×
UNCOV
45
}
×
46

47
// MigrateRevocationLog migrates the old revocation logs into the newer format
48
// and deletes them once finished, with the deletion only happens once ALL the
49
// old logs have been migrates.
UNCOV
50
func MigrateRevocationLog(db kvdb.Backend, cfg MigrateRevLogConfig) error {
×
UNCOV
51
        log.Infof("Migrating revocation logs, might take a while...")
×
UNCOV
52

×
UNCOV
53
        var (
×
UNCOV
54
                err error
×
UNCOV
55

×
UNCOV
56
                // finished is used to exit the for loop.
×
UNCOV
57
                finished bool
×
UNCOV
58

×
UNCOV
59
                // total is the number of total records.
×
UNCOV
60
                total uint64
×
UNCOV
61

×
UNCOV
62
                // migrated is the number of already migrated records.
×
UNCOV
63
                migrated uint64
×
UNCOV
64
        )
×
UNCOV
65

×
UNCOV
66
        // First of all, read the stats of the revocation logs.
×
UNCOV
67
        total, migrated, err = logMigrationStat(db)
×
UNCOV
68
        if err != nil {
×
69
                return err
×
70
        }
×
UNCOV
71
        log.Infof("Total logs=%d, migrated=%d", total, migrated)
×
UNCOV
72

×
UNCOV
73
        // Exit early if the old logs have already been migrated and deleted.
×
UNCOV
74
        if total == 0 {
×
UNCOV
75
                log.Info("Migration already finished!")
×
UNCOV
76
                return nil
×
UNCOV
77
        }
×
78

UNCOV
79
        for {
×
UNCOV
80
                if finished {
×
UNCOV
81
                        log.Infof("Migrating old revocation logs finished, " +
×
UNCOV
82
                                "now checking the migration results...")
×
UNCOV
83
                        break
×
84
                }
85

86
                // Process the migration.
UNCOV
87
                err = kvdb.Update(db, func(tx kvdb.RwTx) error {
×
UNCOV
88
                        finished, err = processMigration(tx, cfg)
×
UNCOV
89
                        if err != nil {
×
90
                                return err
×
91
                        }
×
UNCOV
92
                        return nil
×
UNCOV
93
                }, func() {})
×
UNCOV
94
                if err != nil {
×
95
                        return err
×
96
                }
×
97

98
                // Each time we finished the above process, we'd read the stats
99
                // again to understand the current progress.
UNCOV
100
                total, migrated, err = logMigrationStat(db)
×
UNCOV
101
                if err != nil {
×
102
                        return err
×
103
                }
×
104

105
                // Calculate and log the progress if the progress is less than
106
                // one.
UNCOV
107
                progress := float64(migrated) / float64(total) * 100
×
UNCOV
108
                if progress >= 100 {
×
UNCOV
109
                        continue
×
110
                }
111

112
                log.Infof("Migration progress: %.3f%%, still have: %d",
×
113
                        progress, total-migrated)
×
114
        }
115

116
        // Before we can safety delete the old buckets, we perform a check to
117
        // make sure the logs are migrated as expected.
UNCOV
118
        err = kvdb.Update(db, validateMigration, func() {})
×
UNCOV
119
        if err != nil {
×
120
                return fmt.Errorf("validate migration failed: %w", err)
×
121
        }
×
122

UNCOV
123
        log.Info("Migration check passed, now deleting the old logs...")
×
UNCOV
124

×
UNCOV
125
        // Once the migration completes, we can now safety delete the old
×
UNCOV
126
        // revocation logs.
×
UNCOV
127
        if err := deleteOldBuckets(db); err != nil {
×
128
                return fmt.Errorf("deleteOldBuckets err: %w", err)
×
129
        }
×
130

UNCOV
131
        log.Info("Old revocation log buckets removed!")
×
UNCOV
132
        return nil
×
133
}
134

135
// processMigration finds the next un-migrated revocation logs, reads a max
136
// number of `recordsPerTx` records, converts them into the new revocation logs
137
// and save them to disk.
UNCOV
138
func processMigration(tx kvdb.RwTx, cfg MigrateRevLogConfig) (bool, error) {
×
UNCOV
139
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
×
UNCOV
140

×
UNCOV
141
        // If no bucket is found, we can exit early.
×
UNCOV
142
        if openChanBucket == nil {
×
143
                return false, fmt.Errorf("root bucket not found")
×
144
        }
×
145

146
        // Locate the next migration height.
UNCOV
147
        locator, err := locateNextUpdateNum(openChanBucket)
×
UNCOV
148
        if err != nil {
×
149
                return false, fmt.Errorf("locator got error: %w", err)
×
150
        }
×
151

152
        // If the returned locator is nil, we've done migrating the logs.
UNCOV
153
        if locator == nil {
×
UNCOV
154
                return true, nil
×
UNCOV
155
        }
×
156

157
        // Read a list of old revocation logs.
UNCOV
158
        entryMap, err := readOldRevocationLogs(openChanBucket, locator, cfg)
×
UNCOV
159
        if err != nil {
×
160
                return false, fmt.Errorf("read old logs err: %w", err)
×
161
        }
×
162

163
        // Migrate the revocation logs.
UNCOV
164
        return false, writeRevocationLogs(openChanBucket, entryMap)
×
165
}
166

167
// deleteOldBuckets iterates all the channel buckets and deletes the old
168
// revocation buckets.
UNCOV
169
func deleteOldBuckets(db kvdb.Backend) error {
×
UNCOV
170
        // locators records all the chan buckets found in the database.
×
UNCOV
171
        var locators []*updateLocator
×
UNCOV
172

×
UNCOV
173
        // reader is a helper closure that saves the locator found. Each
×
UNCOV
174
        // locator is relatively small(33+32+36+8=109 bytes), assuming 1 GB of
×
UNCOV
175
        // ram we can fit roughly 10 million records. Since each record
×
UNCOV
176
        // corresponds to a channel, we should have more than enough memory to
×
UNCOV
177
        // read them all.
×
UNCOV
178
        reader := func(_ kvdb.RwBucket, l *updateLocator) error { // nolint:unparam
×
UNCOV
179
                locators = append(locators, l)
×
UNCOV
180
                return nil
×
UNCOV
181
        }
×
182

183
        // remover is a helper closure that removes the old revocation log
184
        // bucket under the specified chan bucket by the given locator.
UNCOV
185
        remover := func(rootBucket kvdb.RwBucket, l *updateLocator) error {
×
UNCOV
186
                chanBucket, err := l.locateChanBucket(rootBucket)
×
UNCOV
187
                if err != nil {
×
188
                        return err
×
189
                }
×
190

UNCOV
191
                return chanBucket.DeleteNestedBucket(
×
UNCOV
192
                        revocationLogBucketDeprecated,
×
UNCOV
193
                )
×
194
        }
195

196
        // Perform the deletion in one db transaction. This should not cause
197
        // any memory issue as the deletion doesn't load any data from the
198
        // buckets.
UNCOV
199
        return kvdb.Update(db, func(tx kvdb.RwTx) error {
×
UNCOV
200
                openChanBucket := tx.ReadWriteBucket(openChannelBucket)
×
UNCOV
201

×
UNCOV
202
                // Exit early if there's no bucket.
×
UNCOV
203
                if openChanBucket == nil {
×
204
                        return nil
×
205
                }
×
206

207
                // Iterate the buckets to find all the locators.
UNCOV
208
                err := iterateBuckets(openChanBucket, nil, reader)
×
UNCOV
209
                if err != nil {
×
210
                        return err
×
211
                }
×
212

213
                // Iterate the locators and delete all the old revocation log
214
                // buckets.
UNCOV
215
                for _, l := range locators {
×
UNCOV
216
                        err := remover(openChanBucket, l)
×
UNCOV
217
                        // If the bucket doesn't exist, we can exit safety.
×
UNCOV
218
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
219
                                return err
×
220
                        }
×
221
                }
222

UNCOV
223
                return nil
×
UNCOV
224
        }, func() {})
×
225
}
226

227
// writeRevocationLogs unwraps the entryMap and writes the new revocation logs.
228
func writeRevocationLogs(openChanBucket kvdb.RwBucket,
UNCOV
229
        entryMap logEntries) error {
×
UNCOV
230

×
UNCOV
231
        for locator, logs := range entryMap {
×
UNCOV
232
                // Find the channel bucket.
×
UNCOV
233
                chanBucket, err := locator.locateChanBucket(openChanBucket)
×
UNCOV
234
                if err != nil {
×
235
                        return fmt.Errorf("locateChanBucket err: %w", err)
×
236
                }
×
237

238
                // Create the new log bucket.
UNCOV
239
                logBucket, err := chanBucket.CreateBucketIfNotExists(
×
UNCOV
240
                        revocationLogBucket,
×
UNCOV
241
                )
×
UNCOV
242
                if err != nil {
×
243
                        return fmt.Errorf("create log bucket err: %w", err)
×
244
                }
×
245

246
                // Write the new logs.
UNCOV
247
                for _, entry := range logs {
×
UNCOV
248
                        var b bytes.Buffer
×
UNCOV
249
                        err := serializeRevocationLog(&b, entry.log)
×
UNCOV
250
                        if err != nil {
×
251
                                return err
×
252
                        }
×
253

UNCOV
254
                        logEntrykey := mig24.MakeLogKey(entry.commitHeight)
×
UNCOV
255
                        err = logBucket.Put(logEntrykey[:], b.Bytes())
×
UNCOV
256
                        if err != nil {
×
257
                                return fmt.Errorf("putRevocationLog err: %w",
×
258
                                        err)
×
259
                        }
×
260
                }
261
        }
262

UNCOV
263
        return nil
×
264
}
265

266
// logMigrationStat reads the buckets to provide stats over current migration
267
// progress. The returned values are the numbers of total records and already
268
// migrated records.
UNCOV
269
func logMigrationStat(db kvdb.Backend) (uint64, uint64, error) {
×
UNCOV
270
        var (
×
UNCOV
271
                err error
×
UNCOV
272

×
UNCOV
273
                // total is the number of total records.
×
UNCOV
274
                total uint64
×
UNCOV
275

×
UNCOV
276
                // unmigrated is the number of unmigrated records.
×
UNCOV
277
                unmigrated uint64
×
UNCOV
278
        )
×
UNCOV
279

×
UNCOV
280
        err = kvdb.Update(db, func(tx kvdb.RwTx) error {
×
UNCOV
281
                total, unmigrated, err = fetchLogStats(tx)
×
UNCOV
282
                return err
×
UNCOV
283
        }, func() {})
×
284

UNCOV
285
        log.Debugf("Total logs=%d, unmigrated=%d", total, unmigrated)
×
UNCOV
286
        return total, total - unmigrated, err
×
287
}
288

289
// fetchLogStats iterates all the chan buckets to provide stats about the logs.
290
// The returned values are num of total records, and num of un-migrated
291
// records.
UNCOV
292
func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, error) {
×
UNCOV
293
        var (
×
UNCOV
294
                total           uint64
×
UNCOV
295
                totalUnmigrated uint64
×
UNCOV
296
        )
×
UNCOV
297

×
UNCOV
298
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
×
UNCOV
299

×
UNCOV
300
        // If no bucket is found, we can exit early.
×
UNCOV
301
        if openChanBucket == nil {
×
302
                return 0, 0, fmt.Errorf("root bucket not found")
×
303
        }
×
304

305
        // counter is a helper closure used to count the number of records
306
        // based on the given bucket.
UNCOV
307
        counter := func(chanBucket kvdb.RwBucket, bucket []byte) uint64 {
×
UNCOV
308
                // Read the sub-bucket level 4.
×
UNCOV
309
                logBucket := chanBucket.NestedReadBucket(bucket)
×
UNCOV
310

×
UNCOV
311
                // Exit early if we don't have the bucket.
×
UNCOV
312
                if logBucket == nil {
×
UNCOV
313
                        return 0
×
UNCOV
314
                }
×
315

316
                // Jump to the end of the cursor.
UNCOV
317
                key, _ := logBucket.ReadCursor().Last()
×
UNCOV
318

×
UNCOV
319
                // Since the CommitHeight is a zero-based monotonically
×
UNCOV
320
                // increased index, its value plus one reflects the total
×
UNCOV
321
                // records under this chan bucket.
×
UNCOV
322
                lastHeight := binary.BigEndian.Uint64(key) + 1
×
UNCOV
323

×
UNCOV
324
                return lastHeight
×
325
        }
326

327
        // countTotal is a callback function used to count the total number of
328
        // records.
UNCOV
329
        countTotal := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
×
UNCOV
330
                total += counter(chanBucket, revocationLogBucketDeprecated)
×
UNCOV
331
                return nil
×
UNCOV
332
        }
×
333

334
        // countUnmigrated is a callback function used to count the total
335
        // number of un-migrated records.
UNCOV
336
        countUnmigrated := func(chanBucket kvdb.RwBucket,
×
UNCOV
337
                l *updateLocator) error {
×
UNCOV
338

×
UNCOV
339
                totalUnmigrated += counter(
×
UNCOV
340
                        chanBucket, revocationLogBucketDeprecated,
×
UNCOV
341
                )
×
UNCOV
342
                return nil
×
UNCOV
343
        }
×
344

345
        // Locate the next migration height.
UNCOV
346
        locator, err := locateNextUpdateNum(openChanBucket)
×
UNCOV
347
        if err != nil {
×
348
                return 0, 0, fmt.Errorf("locator got error: %w", err)
×
349
        }
×
350

351
        // If the returned locator is not nil, we still have un-migrated
352
        // records so we need to count them. Otherwise we've done migrating the
353
        // logs.
UNCOV
354
        if locator != nil {
×
UNCOV
355
                err = iterateBuckets(openChanBucket, locator, countUnmigrated)
×
UNCOV
356
                if err != nil {
×
357
                        return 0, 0, err
×
358
                }
×
359
        }
360

361
        // Count the total number of records by supplying a nil locator.
UNCOV
362
        err = iterateBuckets(openChanBucket, nil, countTotal)
×
UNCOV
363
        if err != nil {
×
364
                return 0, 0, err
×
365
        }
×
366

UNCOV
367
        return total, totalUnmigrated, err
×
368
}
369

370
// logEntry houses the info needed to write a new revocation log.
371
type logEntry struct {
372
        log          *RevocationLog
373
        commitHeight uint64
374
        ourIndex     uint32
375
        theirIndex   uint32
376
        locator      *updateLocator
377
}
378

379
// logEntries maps a bucket locator to a list of entries under that bucket.
380
type logEntries map[*updateLocator][]*logEntry
381

382
// result is made of two channels that's used to send back the constructed new
383
// revocation log or an error.
384
type result struct {
385
        newLog  chan *logEntry
386
        errChan chan error
387
}
388

389
// readOldRevocationLogs finds a list of old revocation logs and converts them
390
// into the new revocation logs.
391
func readOldRevocationLogs(openChanBucket kvdb.RwBucket,
UNCOV
392
        locator *updateLocator, cfg MigrateRevLogConfig) (logEntries, error) {
×
UNCOV
393

×
UNCOV
394
        entries := make(logEntries)
×
UNCOV
395
        results := make([]*result, 0)
×
UNCOV
396

×
UNCOV
397
        var wg sync.WaitGroup
×
UNCOV
398

×
UNCOV
399
        // collectLogs is a helper closure that reads all newly created
×
UNCOV
400
        // revocation logs sent over the result channels.
×
UNCOV
401
        //
×
UNCOV
402
        // NOTE: the order of the logs cannot be guaranteed, which is fine as
×
UNCOV
403
        // boltdb will take care of the orders when saving them.
×
UNCOV
404
        collectLogs := func() error {
×
UNCOV
405
                wg.Wait()
×
UNCOV
406

×
UNCOV
407
                for _, r := range results {
×
UNCOV
408
                        select {
×
UNCOV
409
                        case entry := <-r.newLog:
×
UNCOV
410
                                entries[entry.locator] = append(
×
UNCOV
411
                                        entries[entry.locator], entry,
×
UNCOV
412
                                )
×
413

414
                        case err := <-r.errChan:
×
415
                                return err
×
416
                        }
417
                }
418

UNCOV
419
                return nil
×
420
        }
421

422
        // createLog is a helper closure that constructs a new revocation log.
423
        //
424
        // NOTE: used as a goroutine.
UNCOV
425
        createLog := func(chanState *mig26.OpenChannel,
×
UNCOV
426
                c mig.ChannelCommitment, l *updateLocator, r *result) {
×
UNCOV
427

×
UNCOV
428
                defer wg.Done()
×
UNCOV
429

×
UNCOV
430
                // Find the output indexes.
×
UNCOV
431
                ourIndex, theirIndex, err := findOutputIndexes(chanState, &c)
×
UNCOV
432
                if err != nil {
×
433
                        r.errChan <- err
×
434
                }
×
435

436
                // Convert the old logs into the new logs. We do this early in
437
                // the read tx so the old large revocation log can be set to
438
                // nil here so save us some memory space.
UNCOV
439
                newLog, err := convertRevocationLog(
×
UNCOV
440
                        &c, ourIndex, theirIndex, cfg.GetNoAmountData(),
×
UNCOV
441
                )
×
UNCOV
442
                if err != nil {
×
443
                        r.errChan <- err
×
444
                }
×
445
                // Create the entry that will be used to create the new log.
UNCOV
446
                entry := &logEntry{
×
UNCOV
447
                        log:          newLog,
×
UNCOV
448
                        commitHeight: c.CommitHeight,
×
UNCOV
449
                        ourIndex:     ourIndex,
×
UNCOV
450
                        theirIndex:   theirIndex,
×
UNCOV
451
                        locator:      l,
×
UNCOV
452
                }
×
UNCOV
453

×
UNCOV
454
                r.newLog <- entry
×
455
        }
456

457
        // innerCb is the stepping function used when iterating the old log
458
        // bucket.
UNCOV
459
        innerCb := func(chanState *mig26.OpenChannel, l *updateLocator,
×
UNCOV
460
                _, v []byte) error {
×
UNCOV
461

×
UNCOV
462
                reader := bytes.NewReader(v)
×
UNCOV
463
                c, err := mig.DeserializeChanCommit(reader)
×
UNCOV
464
                if err != nil {
×
465
                        return err
×
466
                }
×
467

UNCOV
468
                r := &result{
×
UNCOV
469
                        newLog:  make(chan *logEntry, 1),
×
UNCOV
470
                        errChan: make(chan error, 1),
×
UNCOV
471
                }
×
UNCOV
472
                results = append(results, r)
×
UNCOV
473

×
UNCOV
474
                // We perform the log creation in a goroutine as it takes some
×
UNCOV
475
                // time to compute and find output indexes.
×
UNCOV
476
                wg.Add(1)
×
UNCOV
477
                go createLog(chanState, c, l, r)
×
UNCOV
478

×
UNCOV
479
                // Check the records read so far and signals exit when we've
×
UNCOV
480
                // reached our memory cap.
×
UNCOV
481
                if len(results) >= recordsPerTx {
×
482
                        return errExit
×
483
                }
×
484

UNCOV
485
                return nil
×
486
        }
487

488
        // cb is the callback function to be used when iterating the buckets.
UNCOV
489
        cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
×
UNCOV
490
                // Read the open channel.
×
UNCOV
491
                c := &mig26.OpenChannel{}
×
UNCOV
492
                err := mig26.FetchChanInfo(chanBucket, c, false)
×
UNCOV
493
                if err != nil {
×
494
                        return fmt.Errorf("unable to fetch chan info: %w", err)
×
495
                }
×
496

UNCOV
497
                err = fetchChanRevocationState(chanBucket, c)
×
UNCOV
498
                if err != nil {
×
499
                        return fmt.Errorf("unable to fetch revocation "+
×
500
                                "state: %v", err)
×
501
                }
×
502

503
                // Read the sub-bucket level 4.
UNCOV
504
                logBucket := chanBucket.NestedReadBucket(
×
UNCOV
505
                        revocationLogBucketDeprecated,
×
UNCOV
506
                )
×
UNCOV
507
                // Exit early if we don't have the old bucket.
×
UNCOV
508
                if logBucket == nil {
×
UNCOV
509
                        return nil
×
UNCOV
510
                }
×
511

512
                // Init the map key when needed.
UNCOV
513
                _, ok := entries[l]
×
UNCOV
514
                if !ok {
×
UNCOV
515
                        entries[l] = make([]*logEntry, 0, recordsPerTx)
×
UNCOV
516
                }
×
517

UNCOV
518
                return iterator(
×
UNCOV
519
                        logBucket, locator.nextHeight,
×
UNCOV
520
                        func(k, v []byte) error {
×
UNCOV
521
                                // Reset the nextHeight for following chan
×
UNCOV
522
                                // buckets.
×
UNCOV
523
                                locator.nextHeight = nil
×
UNCOV
524
                                return innerCb(c, l, k, v)
×
UNCOV
525
                        },
×
526
                )
527
        }
528

UNCOV
529
        err := iterateBuckets(openChanBucket, locator, cb)
×
UNCOV
530
        // If there's an error and it's not exit signal, we won't collect the
×
UNCOV
531
        // logs from the result channels.
×
UNCOV
532
        if err != nil && err != errExit {
×
533
                return nil, err
×
534
        }
×
535

536
        // Otherwise, collect the logs.
UNCOV
537
        err = collectLogs()
×
UNCOV
538

×
UNCOV
539
        return entries, err
×
540
}
541

542
// convertRevocationLog uses the fields `CommitTx` and `Htlcs` from a
543
// ChannelCommitment to construct a revocation log entry.
544
func convertRevocationLog(commit *mig.ChannelCommitment,
545
        ourOutputIndex, theirOutputIndex uint32,
UNCOV
546
        noAmtData bool) (*RevocationLog, error) {
×
UNCOV
547

×
UNCOV
548
        // Sanity check that the output indexes can be safely converted.
×
UNCOV
549
        if ourOutputIndex > math.MaxUint16 {
×
550
                return nil, ErrOutputIndexTooBig
×
551
        }
×
UNCOV
552
        if theirOutputIndex > math.MaxUint16 {
×
553
                return nil, ErrOutputIndexTooBig
×
554
        }
×
555

UNCOV
556
        rl := &RevocationLog{
×
UNCOV
557
                OurOutputIndex:   uint16(ourOutputIndex),
×
UNCOV
558
                TheirOutputIndex: uint16(theirOutputIndex),
×
UNCOV
559
                CommitTxHash:     commit.CommitTx.TxHash(),
×
UNCOV
560
                HTLCEntries:      make([]*HTLCEntry, 0, len(commit.Htlcs)),
×
UNCOV
561
        }
×
UNCOV
562

×
UNCOV
563
        if !noAmtData {
×
564
                rl.TheirBalance = &commit.RemoteBalance
×
565
                rl.OurBalance = &commit.LocalBalance
×
566
        }
×
567

UNCOV
568
        for _, htlc := range commit.Htlcs {
×
UNCOV
569
                // Skip dust HTLCs.
×
UNCOV
570
                if htlc.OutputIndex < 0 {
×
571
                        continue
×
572
                }
573

574
                // Sanity check that the output indexes can be safely
575
                // converted.
UNCOV
576
                if htlc.OutputIndex > math.MaxUint16 {
×
577
                        return nil, ErrOutputIndexTooBig
×
578
                }
×
579

UNCOV
580
                entry := &HTLCEntry{
×
UNCOV
581
                        RHash:         htlc.RHash,
×
UNCOV
582
                        RefundTimeout: htlc.RefundTimeout,
×
UNCOV
583
                        Incoming:      htlc.Incoming,
×
UNCOV
584
                        OutputIndex:   uint16(htlc.OutputIndex),
×
UNCOV
585
                        Amt:           htlc.Amt.ToSatoshis(),
×
UNCOV
586
                }
×
UNCOV
587
                rl.HTLCEntries = append(rl.HTLCEntries, entry)
×
588
        }
589

UNCOV
590
        return rl, nil
×
591
}
592

593
// validateMigration checks that the data saved in the new buckets match those
594
// saved in the old buckets. It does so by checking the last keys saved in both
595
// buckets can match, given the assumption that the `CommitHeight` is
596
// monotonically increased value so the last key represents the total number of
597
// records saved.
UNCOV
598
func validateMigration(tx kvdb.RwTx) error {
×
UNCOV
599
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
×
UNCOV
600

×
UNCOV
601
        // If no bucket is found, we can exit early.
×
UNCOV
602
        if openChanBucket == nil {
×
603
                return nil
×
604
        }
×
605

606
        // exitWithErr is a helper closure that prepends an error message with
607
        // the locator info.
UNCOV
608
        exitWithErr := func(l *updateLocator, msg string) error {
×
UNCOV
609
                return fmt.Errorf("unmatched records found under <nodePub=%x"+
×
UNCOV
610
                        ", chainHash=%x, fundingOutpoint=%x>: %v", l.nodePub,
×
UNCOV
611
                        l.chainHash, l.fundingOutpoint, msg)
×
UNCOV
612
        }
×
613

614
        // cb is the callback function to be used when iterating the buckets.
UNCOV
615
        cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
×
UNCOV
616
                // Read both the old and new revocation log buckets.
×
UNCOV
617
                oldBucket := chanBucket.NestedReadBucket(
×
UNCOV
618
                        revocationLogBucketDeprecated,
×
UNCOV
619
                )
×
UNCOV
620
                newBucket := chanBucket.NestedReadBucket(revocationLogBucket)
×
UNCOV
621

×
UNCOV
622
                // Exit early if the old bucket is nil.
×
UNCOV
623
                //
×
UNCOV
624
                // NOTE: the new bucket may not be nil here as new logs might
×
UNCOV
625
                // have been created using lnd@v0.15.0.
×
UNCOV
626
                if oldBucket == nil {
×
UNCOV
627
                        return nil
×
UNCOV
628
                }
×
629

630
                // Return an error if the expected new bucket cannot be found.
UNCOV
631
                if newBucket == nil {
×
632
                        return exitWithErr(l, "expected new bucket")
×
633
                }
×
634

635
                // Acquire the cursors.
UNCOV
636
                oldCursor := oldBucket.ReadCursor()
×
UNCOV
637
                newCursor := newBucket.ReadCursor()
×
UNCOV
638

×
UNCOV
639
                // Jump to the end of the cursors to do a quick check.
×
UNCOV
640
                newKey, _ := oldCursor.Last()
×
UNCOV
641
                oldKey, _ := newCursor.Last()
×
UNCOV
642

×
UNCOV
643
                // We expected the CommitHeights to be matched for nodes prior
×
UNCOV
644
                // to v0.15.0.
×
UNCOV
645
                if bytes.Equal(newKey, oldKey) {
×
UNCOV
646
                        return nil
×
UNCOV
647
                }
×
648

649
                // If the keys do not match, it's likely the node is running
650
                // v0.15.0 and have new logs created. In this case, we will
651
                // validate that every record in the old bucket can be found in
652
                // the new bucket.
UNCOV
653
                oldKey, _ = oldCursor.First()
×
UNCOV
654

×
UNCOV
655
                for {
×
UNCOV
656
                        // Try to locate the old key in the new bucket and we
×
UNCOV
657
                        // expect it to be found.
×
UNCOV
658
                        newKey, _ := newCursor.Seek(oldKey)
×
UNCOV
659

×
UNCOV
660
                        // If the old key is not found in the new bucket,
×
UNCOV
661
                        // return an error.
×
UNCOV
662
                        //
×
UNCOV
663
                        // NOTE: because Seek will return the next key when the
×
UNCOV
664
                        // passed key cannot be found, we need to compare the
×
UNCOV
665
                        // keys to deicde whether the old key is found or not.
×
UNCOV
666
                        if !bytes.Equal(newKey, oldKey) {
×
UNCOV
667
                                errMsg := fmt.Sprintf("old bucket has "+
×
UNCOV
668
                                        "CommitHeight=%v cannot be found in "+
×
UNCOV
669
                                        "new bucket", oldKey)
×
UNCOV
670
                                return exitWithErr(l, errMsg)
×
UNCOV
671
                        }
×
672

673
                        // Otherwise, keep iterating the old bucket.
UNCOV
674
                        oldKey, _ = oldCursor.Next()
×
UNCOV
675

×
UNCOV
676
                        // If we've done iterating, all keys have been matched
×
UNCOV
677
                        // and we can safely exit.
×
UNCOV
678
                        if oldKey == nil {
×
UNCOV
679
                                return nil
×
UNCOV
680
                        }
×
681
                }
682
        }
683

UNCOV
684
        return iterateBuckets(openChanBucket, nil, cb)
×
685
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc