• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/channeldb/migration30/migration.go
1
package migration30
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7
        "math"
8
        "sync"
9

10
        mig24 "github.com/lightningnetwork/lnd/channeldb/migration24"
11
        mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
12
        mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
)
15

16
// recordsPerTx specifies the number of records to be migrated in each database
17
// transaction. In the worst case, each old revocation log is 28,057 bytes.
18
// 20,000 records would consume 0.56 GB of ram, which is feasible for a modern
19
// machine.
20
//
21
// NOTE: we could've used more ram but it doesn't help with the speed of the
22
// migration since the most of the CPU time is used for calculating the output
23
// indexes.
24
const recordsPerTx = 20_000
25

26
// MigrateRevLogConfig is an interface that defines the config that should be
27
// passed to the MigrateRevocationLog function.
28
type MigrateRevLogConfig interface {
29
        // GetNoAmountData returns true if the amount data of revoked commitment
30
        // transactions should not be stored in the revocation log.
31
        GetNoAmountData() bool
32
}
33

34
// MigrateRevLogConfigImpl implements the MigrationRevLogConfig interface.
35
type MigrateRevLogConfigImpl struct {
36
        // NoAmountData if set to true will result in the amount data of revoked
37
        // commitment transactions not being stored in the revocation log.
38
        NoAmountData bool
39
}
40

41
// GetNoAmountData returns true if the amount data of revoked commitment
42
// transactions should not be stored in the revocation log.
UNCOV
43
func (c *MigrateRevLogConfigImpl) GetNoAmountData() bool {
×
UNCOV
44
        return c.NoAmountData
×
UNCOV
45
}
×
46

47
// MigrateRevocationLog migrates the old revocation logs into the newer format
48
// and deletes them once finished, with the deletion only happens once ALL the
49
// old logs have been migrates.
UNCOV
50
func MigrateRevocationLog(db kvdb.Backend, cfg MigrateRevLogConfig) error {
×
UNCOV
51
        log.Infof("Migrating revocation logs, might take a while...")
×
UNCOV
52

×
UNCOV
53
        var (
×
UNCOV
54
                err error
×
UNCOV
55

×
UNCOV
56
                // finished is used to exit the for loop.
×
UNCOV
57
                finished bool
×
UNCOV
58

×
UNCOV
59
                // total is the number of total records.
×
UNCOV
60
                total uint64
×
UNCOV
61

×
UNCOV
62
                // migrated is the number of already migrated records.
×
UNCOV
63
                migrated uint64
×
UNCOV
64
        )
×
UNCOV
65

×
UNCOV
66
        // First of all, read the stats of the revocation logs.
×
UNCOV
67
        total, migrated, err = logMigrationStat(db)
×
UNCOV
68
        if err != nil {
×
69
                return err
×
70
        }
×
UNCOV
71
        log.Infof("Total logs=%d, migrated=%d", total, migrated)
×
UNCOV
72

×
UNCOV
73
        // Exit early if the old logs have already been migrated and deleted.
×
UNCOV
74
        if total == 0 {
×
UNCOV
75
                log.Info("Migration already finished!")
×
UNCOV
76
                return nil
×
UNCOV
77
        }
×
78

UNCOV
79
        for {
×
UNCOV
80
                if finished {
×
UNCOV
81
                        log.Infof("Migrating old revocation logs finished, " +
×
UNCOV
82
                                "now checking the migration results...")
×
UNCOV
83
                        break
×
84
                }
85

86
                // Process the migration.
UNCOV
87
                err = kvdb.Update(db, func(tx kvdb.RwTx) error {
×
UNCOV
88
                        finished, err = processMigration(tx, cfg)
×
UNCOV
89
                        if err != nil {
×
90
                                return err
×
91
                        }
×
UNCOV
92
                        return nil
×
UNCOV
93
                }, func() {})
×
UNCOV
94
                if err != nil {
×
95
                        return err
×
96
                }
×
97

98
                // Each time we finished the above process, we'd read the stats
99
                // again to understand the current progress.
UNCOV
100
                total, migrated, err = logMigrationStat(db)
×
UNCOV
101
                if err != nil {
×
102
                        return err
×
103
                }
×
104

105
                // Calculate and log the progress if the progress is less than
106
                // one.
UNCOV
107
                progress := float64(migrated) / float64(total) * 100
×
UNCOV
108
                if progress >= 100 {
×
UNCOV
109
                        continue
×
110
                }
111

112
                log.Infof("Migration progress: %.3f%%, still have: %d",
×
113
                        progress, total-migrated)
×
114
        }
115

116
        // Before we can safety delete the old buckets, we perform a check to
117
        // make sure the logs are migrated as expected.
UNCOV
118
        err = kvdb.Update(db, validateMigration, func() {})
×
UNCOV
119
        if err != nil {
×
120
                return fmt.Errorf("validate migration failed: %w", err)
×
121
        }
×
122

UNCOV
123
        log.Info("Migration check passed, now deleting the old logs...")
×
UNCOV
124

×
UNCOV
125
        // Once the migration completes, we can now safety delete the old
×
UNCOV
126
        // revocation logs.
×
UNCOV
127
        if err := deleteOldBuckets(db); err != nil {
×
128
                return fmt.Errorf("deleteOldBuckets err: %w", err)
×
129
        }
×
130

UNCOV
131
        log.Info("Old revocation log buckets removed!")
×
UNCOV
132
        return nil
×
133
}
134

135
// processMigration finds the next un-migrated revocation logs, reads a max
136
// number of `recordsPerTx` records, converts them into the new revocation logs
137
// and save them to disk.
UNCOV
138
func processMigration(tx kvdb.RwTx, cfg MigrateRevLogConfig) (bool, error) {
×
UNCOV
139
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
×
UNCOV
140

×
UNCOV
141
        // If no bucket is found, we can exit early.
×
UNCOV
142
        if openChanBucket == nil {
×
143
                return false, fmt.Errorf("root bucket not found")
×
144
        }
×
145

146
        // Locate the next migration height.
UNCOV
147
        locator, err := locateNextUpdateNum(openChanBucket)
×
UNCOV
148
        if err != nil {
×
149
                return false, fmt.Errorf("locator got error: %w", err)
×
150
        }
×
151

152
        // If the returned locator is nil, we've done migrating the logs.
UNCOV
153
        if locator == nil {
×
UNCOV
154
                return true, nil
×
UNCOV
155
        }
×
156

157
        // Read a list of old revocation logs.
UNCOV
158
        entryMap, err := readOldRevocationLogs(openChanBucket, locator, cfg)
×
UNCOV
159
        if err != nil {
×
160
                return false, fmt.Errorf("read old logs err: %w", err)
×
161
        }
×
162

163
        // Migrate the revocation logs.
UNCOV
164
        return false, writeRevocationLogs(openChanBucket, entryMap)
×
165
}
166

167
// deleteOldBuckets iterates all the channel buckets and deletes the old
168
// revocation buckets.
UNCOV
169
func deleteOldBuckets(db kvdb.Backend) error {
×
UNCOV
170
        // locators records all the chan buckets found in the database.
×
UNCOV
171
        var locators []*updateLocator
×
UNCOV
172

×
UNCOV
173
        // reader is a helper closure that saves the locator found. Each
×
UNCOV
174
        // locator is relatively small(33+32+36+8=109 bytes), assuming 1 GB of
×
UNCOV
175
        // ram we can fit roughly 10 million records. Since each record
×
UNCOV
176
        // corresponds to a channel, we should have more than enough memory to
×
UNCOV
177
        // read them all.
×
UNCOV
178
        reader := func(_ kvdb.RwBucket, l *updateLocator) error { // nolint:unparam
×
UNCOV
179
                locators = append(locators, l)
×
UNCOV
180
                return nil
×
UNCOV
181
        }
×
182

183
        // remover is a helper closure that removes the old revocation log
184
        // bucket under the specified chan bucket by the given locator.
UNCOV
185
        remover := func(rootBucket kvdb.RwBucket, l *updateLocator) error {
×
UNCOV
186
                chanBucket, err := l.locateChanBucket(rootBucket)
×
UNCOV
187
                if err != nil {
×
188
                        return err
×
189
                }
×
190

UNCOV
191
                return chanBucket.DeleteNestedBucket(
×
UNCOV
192
                        revocationLogBucketDeprecated,
×
UNCOV
193
                )
×
194
        }
195

196
        // Perform the deletion in one db transaction. This should not cause
197
        // any memory issue as the deletion doesn't load any data from the
198
        // buckets.
UNCOV
199
        return kvdb.Update(db, func(tx kvdb.RwTx) error {
×
UNCOV
200
                openChanBucket := tx.ReadWriteBucket(openChannelBucket)
×
UNCOV
201

×
UNCOV
202
                // Exit early if there's no bucket.
×
UNCOV
203
                if openChanBucket == nil {
×
204
                        return nil
×
205
                }
×
206

207
                // Iterate the buckets to find all the locators.
UNCOV
208
                err := iterateBuckets(openChanBucket, nil, reader)
×
UNCOV
209
                if err != nil {
×
210
                        return err
×
211
                }
×
212

213
                // Iterate the locators and delete all the old revocation log
214
                // buckets.
UNCOV
215
                for _, l := range locators {
×
UNCOV
216
                        err := remover(openChanBucket, l)
×
UNCOV
217
                        // If the bucket doesn't exist, we can exit safety.
×
UNCOV
218
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
219
                                return err
×
220
                        }
×
221
                }
222

UNCOV
223
                return nil
×
UNCOV
224
        }, func() {})
×
225
}
226

227
// writeRevocationLogs unwraps the entryMap and writes the new revocation logs.
228
func writeRevocationLogs(openChanBucket kvdb.RwBucket,
UNCOV
229
        entryMap logEntries) error {
×
UNCOV
230

×
UNCOV
231
        for locator, logs := range entryMap {
×
UNCOV
232
                // Find the channel bucket.
×
UNCOV
233
                chanBucket, err := locator.locateChanBucket(openChanBucket)
×
UNCOV
234
                if err != nil {
×
235
                        return fmt.Errorf("locateChanBucket err: %w", err)
×
236
                }
×
237

238
                // Create the new log bucket.
UNCOV
239
                logBucket, err := chanBucket.CreateBucketIfNotExists(
×
UNCOV
240
                        revocationLogBucket,
×
UNCOV
241
                )
×
UNCOV
242
                if err != nil {
×
243
                        return fmt.Errorf("create log bucket err: %w", err)
×
244
                }
×
245

246
                // Write the new logs.
UNCOV
247
                for _, entry := range logs {
×
UNCOV
248
                        var b bytes.Buffer
×
UNCOV
249
                        err := serializeRevocationLog(&b, entry.log)
×
UNCOV
250
                        if err != nil {
×
251
                                return err
×
252
                        }
×
253

UNCOV
254
                        logEntrykey := mig24.MakeLogKey(entry.commitHeight)
×
UNCOV
255
                        err = logBucket.Put(logEntrykey[:], b.Bytes())
×
UNCOV
256
                        if err != nil {
×
257
                                return fmt.Errorf("putRevocationLog err: %w",
×
258
                                        err)
×
259
                        }
×
260
                }
261
        }
262

UNCOV
263
        return nil
×
264
}
265

266
// logMigrationStat reads the buckets to provide stats over current migration
267
// progress. The returned values are the numbers of total records and already
268
// migrated records.
UNCOV
269
func logMigrationStat(db kvdb.Backend) (uint64, uint64, error) {
×
UNCOV
270
        var (
×
UNCOV
271
                err error
×
UNCOV
272

×
UNCOV
273
                // total is the number of total records.
×
UNCOV
274
                total uint64
×
UNCOV
275

×
UNCOV
276
                // unmigrated is the number of unmigrated records.
×
UNCOV
277
                unmigrated uint64
×
UNCOV
278
        )
×
UNCOV
279

×
UNCOV
280
        err = kvdb.Update(db, func(tx kvdb.RwTx) error {
×
UNCOV
281
                total, unmigrated, err = fetchLogStats(tx)
×
UNCOV
282
                return err
×
UNCOV
283
        }, func() {})
×
284

UNCOV
285
        log.Debugf("Total logs=%d, unmigrated=%d", total, unmigrated)
×
UNCOV
286
        return total, total - unmigrated, err
×
287
}
288

289
// fetchLogStats iterates all the chan buckets to provide stats about the logs.
290
// The returned values are num of total records, and num of un-migrated
291
// records.
UNCOV
292
func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, error) {
×
UNCOV
293
        var (
×
UNCOV
294
                total           uint64
×
UNCOV
295
                totalUnmigrated uint64
×
UNCOV
296
        )
×
UNCOV
297

×
UNCOV
298
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
×
UNCOV
299

×
UNCOV
300
        // If no bucket is found, we can exit early.
×
UNCOV
301
        if openChanBucket == nil {
×
302
                return 0, 0, fmt.Errorf("root bucket not found")
×
303
        }
×
304

305
        // counter is a helper closure used to count the number of records
306
        // based on the given bucket.
UNCOV
307
        counter := func(chanBucket kvdb.RwBucket, bucket []byte) uint64 {
×
UNCOV
308
                // Read the sub-bucket level 4.
×
UNCOV
309
                logBucket := chanBucket.NestedReadBucket(bucket)
×
UNCOV
310

×
UNCOV
311
                // Exit early if we don't have the bucket.
×
UNCOV
312
                if logBucket == nil {
×
UNCOV
313
                        return 0
×
UNCOV
314
                }
×
315

316
                // Jump to the end of the cursor.
UNCOV
317
                key, _ := logBucket.ReadCursor().Last()
×
UNCOV
318

×
UNCOV
319
                // Since the CommitHeight is a zero-based monotonically
×
UNCOV
320
                // increased index, its value plus one reflects the total
×
UNCOV
321
                // records under this chan bucket.
×
UNCOV
322
                lastHeight := binary.BigEndian.Uint64(key) + 1
×
UNCOV
323

×
UNCOV
324
                return lastHeight
×
325
        }
326

327
        // countTotal is a callback function used to count the total number of
328
        // records.
UNCOV
329
        countTotal := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
×
UNCOV
330
                total += counter(chanBucket, revocationLogBucketDeprecated)
×
UNCOV
331
                return nil
×
UNCOV
332
        }
×
333

334
        // countUnmigrated is a callback function used to count the total
335
        // number of un-migrated records.
UNCOV
336
        countUnmigrated := func(chanBucket kvdb.RwBucket,
×
UNCOV
337
                l *updateLocator) error {
×
UNCOV
338

×
UNCOV
339
                totalUnmigrated += counter(
×
UNCOV
340
                        chanBucket, revocationLogBucketDeprecated,
×
UNCOV
341
                )
×
UNCOV
342
                return nil
×
UNCOV
343
        }
×
344

345
        // Locate the next migration height.
UNCOV
346
        locator, err := locateNextUpdateNum(openChanBucket)
×
UNCOV
347
        if err != nil {
×
348
                return 0, 0, fmt.Errorf("locator got error: %w", err)
×
349
        }
×
350

351
        // If the returned locator is not nil, we still have un-migrated
352
        // records so we need to count them. Otherwise we've done migrating the
353
        // logs.
UNCOV
354
        if locator != nil {
×
UNCOV
355
                err = iterateBuckets(openChanBucket, locator, countUnmigrated)
×
UNCOV
356
                if err != nil {
×
357
                        return 0, 0, err
×
358
                }
×
359
        }
360

361
        // Count the total number of records by supplying a nil locator.
UNCOV
362
        err = iterateBuckets(openChanBucket, nil, countTotal)
×
UNCOV
363
        if err != nil {
×
364
                return 0, 0, err
×
365
        }
×
366

UNCOV
367
        return total, totalUnmigrated, err
×
368
}
369

370
// logEntry houses the info needed to write a new revocation log.
371
type logEntry struct {
372
        log          *RevocationLog
373
        commitHeight uint64
374
        ourIndex     uint32
375
        theirIndex   uint32
376
        locator      *updateLocator
377
}
378

379
// logEntries maps a bucket locator to a list of entries under that bucket.
380
type logEntries map[*updateLocator][]*logEntry
381

382
// result is made of two channels that's used to send back the constructed new
383
// revocation log or an error.
384
type result struct {
385
        newLog  chan *logEntry
386
        errChan chan error
387
}
388

389
// readOldRevocationLogs finds a list of old revocation logs and converts them
390
// into the new revocation logs.
391
func readOldRevocationLogs(openChanBucket kvdb.RwBucket,
UNCOV
392
        locator *updateLocator, cfg MigrateRevLogConfig) (logEntries, error) {
×
UNCOV
393

×
UNCOV
394
        entries := make(logEntries)
×
UNCOV
395
        results := make([]*result, 0)
×
UNCOV
396

×
UNCOV
397
        var wg sync.WaitGroup
×
UNCOV
398

×
UNCOV
399
        // collectLogs is a helper closure that reads all newly created
×
UNCOV
400
        // revocation logs sent over the result channels.
×
UNCOV
401
        //
×
UNCOV
402
        // NOTE: the order of the logs cannot be guaranteed, which is fine as
×
UNCOV
403
        // boltdb will take care of the orders when saving them.
×
UNCOV
404
        collectLogs := func() error {
×
UNCOV
405
                wg.Wait()
×
UNCOV
406

×
UNCOV
407
                for _, r := range results {
×
UNCOV
408
                        select {
×
UNCOV
409
                        case entry := <-r.newLog:
×
UNCOV
410
                                entries[entry.locator] = append(
×
UNCOV
411
                                        entries[entry.locator], entry,
×
UNCOV
412
                                )
×
413

414
                        case err := <-r.errChan:
×
415
                                return err
×
416
                        }
417
                }
418

UNCOV
419
                return nil
×
420
        }
421

422
        // createLog is a helper closure that constructs a new revocation log.
423
        //
424
        // NOTE: used as a goroutine.
UNCOV
425
        createLog := func(chanState *mig26.OpenChannel,
×
UNCOV
426
                c mig.ChannelCommitment, l *updateLocator, r *result) {
×
UNCOV
427

×
UNCOV
428
                defer wg.Done()
×
UNCOV
429

×
UNCOV
430
                // Find the output indexes.
×
UNCOV
431
                ourIndex, theirIndex, err := findOutputIndexes(chanState, &c)
×
UNCOV
432
                if err != nil {
×
433
                        r.errChan <- err
×
434
                }
×
435

436
                // Convert the old logs into the new logs. We do this early in
437
                // the read tx so the old large revocation log can be set to
438
                // nil here so save us some memory space.
UNCOV
439
                newLog, err := convertRevocationLog(
×
UNCOV
440
                        &c, ourIndex, theirIndex, cfg.GetNoAmountData(),
×
UNCOV
441
                )
×
UNCOV
442
                if err != nil {
×
443
                        r.errChan <- err
×
444
                }
×
445
                // Create the entry that will be used to create the new log.
UNCOV
446
                entry := &logEntry{
×
UNCOV
447
                        log:          newLog,
×
UNCOV
448
                        commitHeight: c.CommitHeight,
×
UNCOV
449
                        ourIndex:     ourIndex,
×
UNCOV
450
                        theirIndex:   theirIndex,
×
UNCOV
451
                        locator:      l,
×
UNCOV
452
                }
×
UNCOV
453

×
UNCOV
454
                r.newLog <- entry
×
455
        }
456

457
        // innerCb is the stepping function used when iterating the old log
458
        // bucket.
UNCOV
459
        innerCb := func(chanState *mig26.OpenChannel, l *updateLocator,
×
UNCOV
460
                _, v []byte) error {
×
UNCOV
461

×
UNCOV
462
                reader := bytes.NewReader(v)
×
UNCOV
463
                c, err := mig.DeserializeChanCommit(reader)
×
UNCOV
464
                if err != nil {
×
465
                        return err
×
466
                }
×
467

UNCOV
468
                r := &result{
×
UNCOV
469
                        newLog:  make(chan *logEntry, 1),
×
UNCOV
470
                        errChan: make(chan error, 1),
×
UNCOV
471
                }
×
UNCOV
472
                results = append(results, r)
×
UNCOV
473

×
UNCOV
474
                // We perform the log creation in a goroutine as it takes some
×
UNCOV
475
                // time to compute and find output indexes.
×
UNCOV
476
                wg.Add(1)
×
UNCOV
477
                go createLog(chanState, c, l, r)
×
UNCOV
478

×
UNCOV
479
                // Check the records read so far and signals exit when we've
×
UNCOV
480
                // reached our memory cap.
×
UNCOV
481
                if len(results) >= recordsPerTx {
×
482
                        return errExit
×
483
                }
×
484

UNCOV
485
                return nil
×
486
        }
487

488
        // cb is the callback function to be used when iterating the buckets.
UNCOV
489
        cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
×
UNCOV
490
                // Read the open channel.
×
UNCOV
491
                c := &mig26.OpenChannel{}
×
UNCOV
492
                err := mig26.FetchChanInfo(chanBucket, c, false)
×
UNCOV
493
                if err != nil {
×
494
                        return fmt.Errorf("unable to fetch chan info: %w", err)
×
495
                }
×
496

UNCOV
497
                err = fetchChanRevocationState(chanBucket, c)
×
UNCOV
498
                if err != nil {
×
499
                        return fmt.Errorf("unable to fetch revocation "+
×
500
                                "state: %v", err)
×
501
                }
×
502

503
                // Read the sub-bucket level 4.
UNCOV
504
                logBucket := chanBucket.NestedReadBucket(
×
UNCOV
505
                        revocationLogBucketDeprecated,
×
UNCOV
506
                )
×
UNCOV
507
                // Exit early if we don't have the old bucket.
×
UNCOV
508
                if logBucket == nil {
×
UNCOV
509
                        return nil
×
UNCOV
510
                }
×
511

512
                // Init the map key when needed.
UNCOV
513
                _, ok := entries[l]
×
UNCOV
514
                if !ok {
×
UNCOV
515
                        entries[l] = make([]*logEntry, 0, recordsPerTx)
×
UNCOV
516
                }
×
517

UNCOV
518
                return iterator(
×
UNCOV
519
                        logBucket, locator.nextHeight,
×
UNCOV
520
                        func(k, v []byte) error {
×
UNCOV
521
                                // Reset the nextHeight for following chan
×
UNCOV
522
                                // buckets.
×
UNCOV
523
                                locator.nextHeight = nil
×
UNCOV
524
                                return innerCb(c, l, k, v)
×
UNCOV
525
                        },
×
526
                )
527
        }
528

UNCOV
529
        err := iterateBuckets(openChanBucket, locator, cb)
×
UNCOV
530
        // If there's an error and it's not exit signal, we won't collect the
×
UNCOV
531
        // logs from the result channels.
×
UNCOV
532
        if err != nil && err != errExit {
×
533
                return nil, err
×
534
        }
×
535

536
        // Otherwise, collect the logs.
UNCOV
537
        err = collectLogs()
×
UNCOV
538

×
UNCOV
539
        return entries, err
×
540
}
541

542
// convertRevocationLog uses the fields `CommitTx` and `Htlcs` from a
543
// ChannelCommitment to construct a revocation log entry.
544
func convertRevocationLog(commit *mig.ChannelCommitment,
545
        ourOutputIndex, theirOutputIndex uint32,
UNCOV
546
        noAmtData bool) (*RevocationLog, error) {
×
UNCOV
547

×
UNCOV
548
        // Sanity check that the output indexes can be safely converted.
×
UNCOV
549
        if ourOutputIndex > math.MaxUint16 {
×
550
                return nil, ErrOutputIndexTooBig
×
551
        }
×
UNCOV
552
        if theirOutputIndex > math.MaxUint16 {
×
553
                return nil, ErrOutputIndexTooBig
×
554
        }
×
555

UNCOV
556
        rl := &RevocationLog{
×
UNCOV
557
                OurOutputIndex:   uint16(ourOutputIndex),
×
UNCOV
558
                TheirOutputIndex: uint16(theirOutputIndex),
×
UNCOV
559
                CommitTxHash:     commit.CommitTx.TxHash(),
×
UNCOV
560
                HTLCEntries:      make([]*HTLCEntry, 0, len(commit.Htlcs)),
×
UNCOV
561
        }
×
UNCOV
562

×
UNCOV
563
        if !noAmtData {
×
564
                rl.TheirBalance = &commit.RemoteBalance
×
565
                rl.OurBalance = &commit.LocalBalance
×
566
        }
×
567

UNCOV
568
        for _, htlc := range commit.Htlcs {
×
UNCOV
569
                // Skip dust HTLCs.
×
UNCOV
570
                if htlc.OutputIndex < 0 {
×
571
                        continue
×
572
                }
573

574
                // Sanity check that the output indexes can be safely
575
                // converted.
UNCOV
576
                if htlc.OutputIndex > math.MaxUint16 {
×
577
                        return nil, ErrOutputIndexTooBig
×
578
                }
×
579

UNCOV
580
                entry := &HTLCEntry{
×
UNCOV
581
                        RHash:         htlc.RHash,
×
UNCOV
582
                        RefundTimeout: htlc.RefundTimeout,
×
UNCOV
583
                        Incoming:      htlc.Incoming,
×
UNCOV
584
                        OutputIndex:   uint16(htlc.OutputIndex),
×
UNCOV
585
                        Amt:           htlc.Amt.ToSatoshis(),
×
UNCOV
586
                }
×
UNCOV
587
                rl.HTLCEntries = append(rl.HTLCEntries, entry)
×
588
        }
589

UNCOV
590
        return rl, nil
×
591
}
592

593
// validateMigration checks that the data saved in the new buckets match those
594
// saved in the old buckets. It does so by checking the last keys saved in both
595
// buckets can match, given the assumption that the `CommitHeight` is
596
// monotonically increased value so the last key represents the total number of
597
// records saved.
UNCOV
598
func validateMigration(tx kvdb.RwTx) error {
×
UNCOV
599
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
×
UNCOV
600

×
UNCOV
601
        // If no bucket is found, we can exit early.
×
UNCOV
602
        if openChanBucket == nil {
×
603
                return nil
×
604
        }
×
605

606
        // exitWithErr is a helper closure that prepends an error message with
607
        // the locator info.
UNCOV
608
        exitWithErr := func(l *updateLocator, msg string) error {
×
UNCOV
609
                return fmt.Errorf("unmatched records found under <nodePub=%x"+
×
UNCOV
610
                        ", chainHash=%x, fundingOutpoint=%x>: %v", l.nodePub,
×
UNCOV
611
                        l.chainHash, l.fundingOutpoint, msg)
×
UNCOV
612
        }
×
613

614
        // cb is the callback function to be used when iterating the buckets.
UNCOV
615
        cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
×
UNCOV
616
                // Read both the old and new revocation log buckets.
×
UNCOV
617
                oldBucket := chanBucket.NestedReadBucket(
×
UNCOV
618
                        revocationLogBucketDeprecated,
×
UNCOV
619
                )
×
UNCOV
620
                newBucket := chanBucket.NestedReadBucket(revocationLogBucket)
×
UNCOV
621

×
UNCOV
622
                // Exit early if the old bucket is nil.
×
UNCOV
623
                //
×
UNCOV
624
                // NOTE: the new bucket may not be nil here as new logs might
×
UNCOV
625
                // have been created using lnd@v0.15.0.
×
UNCOV
626
                if oldBucket == nil {
×
UNCOV
627
                        return nil
×
UNCOV
628
                }
×
629

630
                // Return an error if the expected new bucket cannot be found.
UNCOV
631
                if newBucket == nil {
×
632
                        return exitWithErr(l, "expected new bucket")
×
633
                }
×
634

635
                // Acquire the cursors.
UNCOV
636
                oldCursor := oldBucket.ReadCursor()
×
UNCOV
637
                newCursor := newBucket.ReadCursor()
×
UNCOV
638

×
UNCOV
639
                // Jump to the end of the cursors to do a quick check.
×
UNCOV
640
                newKey, _ := oldCursor.Last()
×
UNCOV
641
                oldKey, _ := newCursor.Last()
×
UNCOV
642

×
UNCOV
643
                // We expected the CommitHeights to be matched for nodes prior
×
UNCOV
644
                // to v0.15.0.
×
UNCOV
645
                if bytes.Equal(newKey, oldKey) {
×
UNCOV
646
                        return nil
×
UNCOV
647
                }
×
648

649
                // If the keys do not match, it's likely the node is running
650
                // v0.15.0 and have new logs created. In this case, we will
651
                // validate that every record in the old bucket can be found in
652
                // the new bucket.
UNCOV
653
                oldKey, _ = oldCursor.First()
×
UNCOV
654

×
UNCOV
655
                for {
×
UNCOV
656
                        // Try to locate the old key in the new bucket and we
×
UNCOV
657
                        // expect it to be found.
×
UNCOV
658
                        newKey, _ := newCursor.Seek(oldKey)
×
UNCOV
659

×
UNCOV
660
                        // If the old key is not found in the new bucket,
×
UNCOV
661
                        // return an error.
×
UNCOV
662
                        //
×
UNCOV
663
                        // NOTE: because Seek will return the next key when the
×
UNCOV
664
                        // passed key cannot be found, we need to compare the
×
UNCOV
665
                        // keys to deicde whether the old key is found or not.
×
UNCOV
666
                        if !bytes.Equal(newKey, oldKey) {
×
UNCOV
667
                                errMsg := fmt.Sprintf("old bucket has "+
×
UNCOV
668
                                        "CommitHeight=%v cannot be found in "+
×
UNCOV
669
                                        "new bucket", oldKey)
×
UNCOV
670
                                return exitWithErr(l, errMsg)
×
UNCOV
671
                        }
×
672

673
                        // Otherwise, keep iterating the old bucket.
UNCOV
674
                        oldKey, _ = oldCursor.Next()
×
UNCOV
675

×
UNCOV
676
                        // If we've done iterating, all keys have been matched
×
UNCOV
677
                        // and we can safely exit.
×
UNCOV
678
                        if oldKey == nil {
×
UNCOV
679
                                return nil
×
UNCOV
680
                        }
×
681
                }
682
        }
683

UNCOV
684
        return iterateBuckets(openChanBucket, nil, cb)
×
685
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc