• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12349698563

16 Dec 2024 09:29AM UTC coverage: 58.55% (-0.09%) from 58.636%
12349698563

Pull #9357

github

GeorgeTsagk
contractcourt: include custom records on replayed htlc

When notifying the invoice registry for an exit hop htlc we also want to
include its custom records. The channelLink, the other caller of this
method, already populates this field. So we make sure the contest
resolver does so too.
Pull Request #9357: contractcourt: include custom records on replayed htlc

2 of 2 new or added lines in 1 file covered. (100.0%)

262 existing lines in 24 files now uncovered.

134243 of 229278 relevant lines covered (58.55%)

19277.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.84
/channeldb/migration30/migration.go
1
package migration30
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7
        "math"
8
        "sync"
9

10
        mig24 "github.com/lightningnetwork/lnd/channeldb/migration24"
11
        mig26 "github.com/lightningnetwork/lnd/channeldb/migration26"
12
        mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
)
15

16
// recordsPerTx specifies the number of records to be migrated in each database
17
// transaction. In the worst case, each old revocation log is 28,057 bytes.
18
// 20,000 records would consume 0.56 GB of ram, which is feasible for a modern
19
// machine.
20
//
21
// NOTE: we could've used more ram but it doesn't help with the speed of the
22
// migration since the most of the CPU time is used for calculating the output
23
// indexes.
24
const recordsPerTx = 20_000
25

26
// MigrateRevLogConfig is an interface that defines the config that should be
27
// passed to the MigrateRevocationLog function.
28
type MigrateRevLogConfig interface {
29
        // GetNoAmountData returns true if the amount data of revoked commitment
30
        // transactions should not be stored in the revocation log.
31
        GetNoAmountData() bool
32
}
33

34
// MigrateRevLogConfigImpl implements the MigrationRevLogConfig interface.
35
type MigrateRevLogConfigImpl struct {
36
        // NoAmountData if set to true will result in the amount data of revoked
37
        // commitment transactions not being stored in the revocation log.
38
        NoAmountData bool
39
}
40

41
// GetNoAmountData returns true if the amount data of revoked commitment
42
// transactions should not be stored in the revocation log.
43
func (c *MigrateRevLogConfigImpl) GetNoAmountData() bool {
2,826✔
44
        return c.NoAmountData
2,826✔
45
}
2,826✔
46

47
// MigrateRevocationLog migrates the old revocation logs into the newer format
48
// and deletes them once finished, with the deletion only happens once ALL the
49
// old logs have been migrates.
50
func MigrateRevocationLog(db kvdb.Backend, cfg MigrateRevLogConfig) error {
900✔
51
        log.Infof("Migrating revocation logs, might take a while...")
900✔
52

900✔
53
        var (
900✔
54
                err error
900✔
55

900✔
56
                // finished is used to exit the for loop.
900✔
57
                finished bool
900✔
58

900✔
59
                // total is the number of total records.
900✔
60
                total uint64
900✔
61

900✔
62
                // migrated is the number of already migrated records.
900✔
63
                migrated uint64
900✔
64
        )
900✔
65

900✔
66
        // First of all, read the stats of the revocation logs.
900✔
67
        total, migrated, err = logMigrationStat(db)
900✔
68
        if err != nil {
900✔
69
                return err
×
70
        }
×
71
        log.Infof("Total logs=%d, migrated=%d", total, migrated)
900✔
72

900✔
73
        // Exit early if the old logs have already been migrated and deleted.
900✔
74
        if total == 0 {
936✔
75
                log.Info("Migration already finished!")
36✔
76
                return nil
36✔
77
        }
36✔
78

79
        for {
3,348✔
80
                if finished {
3,348✔
81
                        log.Infof("Migrating old revocation logs finished, " +
864✔
82
                                "now checking the migration results...")
864✔
83
                        break
864✔
84
                }
85

86
                // Process the migration.
87
                err = kvdb.Update(db, func(tx kvdb.RwTx) error {
3,240✔
88
                        finished, err = processMigration(tx, cfg)
1,620✔
89
                        if err != nil {
1,620✔
90
                                return err
×
91
                        }
×
92
                        return nil
1,620✔
93
                }, func() {})
1,620✔
94
                if err != nil {
1,620✔
95
                        return err
×
96
                }
×
97

98
                // Each time we finished the above process, we'd read the stats
99
                // again to understand the current progress.
100
                total, migrated, err = logMigrationStat(db)
1,620✔
101
                if err != nil {
1,620✔
102
                        return err
×
103
                }
×
104

105
                // Calculate and log the progress if the progress is less than
106
                // one.
107
                progress := float64(migrated) / float64(total) * 100
1,620✔
108
                if progress >= 100 {
3,240✔
109
                        continue
1,620✔
110
                }
111

112
                log.Infof("Migration progress: %.3f%%, still have: %d",
×
113
                        progress, total-migrated)
×
114
        }
115

116
        // Before we can safety delete the old buckets, we perform a check to
117
        // make sure the logs are migrated as expected.
118
        err = kvdb.Update(db, validateMigration, func() {})
1,728✔
119
        if err != nil {
864✔
120
                return fmt.Errorf("validate migration failed: %w", err)
×
121
        }
×
122

123
        log.Info("Migration check passed, now deleting the old logs...")
864✔
124

864✔
125
        // Once the migration completes, we can now safety delete the old
864✔
126
        // revocation logs.
864✔
127
        if err := deleteOldBuckets(db); err != nil {
864✔
128
                return fmt.Errorf("deleteOldBuckets err: %w", err)
×
129
        }
×
130

131
        log.Info("Old revocation log buckets removed!")
864✔
132
        return nil
864✔
133
}
134

135
// processMigration finds the next un-migrated revocation logs, reads a max
136
// number of `recordsPerTx` records, converts them into the new revocation logs
137
// and save them to disk.
138
func processMigration(tx kvdb.RwTx, cfg MigrateRevLogConfig) (bool, error) {
1,620✔
139
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
1,620✔
140

1,620✔
141
        // If no bucket is found, we can exit early.
1,620✔
142
        if openChanBucket == nil {
1,620✔
143
                return false, fmt.Errorf("root bucket not found")
×
144
        }
×
145

146
        // Locate the next migration height.
147
        locator, err := locateNextUpdateNum(openChanBucket)
1,620✔
148
        if err != nil {
1,620✔
149
                return false, fmt.Errorf("locator got error: %w", err)
×
150
        }
×
151

152
        // If the returned locator is nil, we've done migrating the logs.
153
        if locator == nil {
2,484✔
154
                return true, nil
864✔
155
        }
864✔
156

157
        // Read a list of old revocation logs.
158
        entryMap, err := readOldRevocationLogs(openChanBucket, locator, cfg)
756✔
159
        if err != nil {
756✔
160
                return false, fmt.Errorf("read old logs err: %w", err)
×
161
        }
×
162

163
        // Migrate the revocation logs.
164
        return false, writeRevocationLogs(openChanBucket, entryMap)
756✔
165
}
166

167
// deleteOldBuckets iterates all the channel buckets and deletes the old
168
// revocation buckets.
169
func deleteOldBuckets(db kvdb.Backend) error {
864✔
170
        // locators records all the chan buckets found in the database.
864✔
171
        var locators []*updateLocator
864✔
172

864✔
173
        // reader is a helper closure that saves the locator found. Each
864✔
174
        // locator is relatively small(33+32+36+8=109 bytes), assuming 1 GB of
864✔
175
        // ram we can fit roughly 10 million records. Since each record
864✔
176
        // corresponds to a channel, we should have more than enough memory to
864✔
177
        // read them all.
864✔
178
        reader := func(_ kvdb.RwBucket, l *updateLocator) error { // nolint:unparam
3,444✔
179
                locators = append(locators, l)
2,580✔
180
                return nil
2,580✔
181
        }
2,580✔
182

183
        // remover is a helper closure that removes the old revocation log
184
        // bucket under the specified chan bucket by the given locator.
185
        remover := func(rootBucket kvdb.RwBucket, l *updateLocator) error {
3,444✔
186
                chanBucket, err := l.locateChanBucket(rootBucket)
2,580✔
187
                if err != nil {
2,580✔
188
                        return err
×
189
                }
×
190

191
                return chanBucket.DeleteNestedBucket(
2,580✔
192
                        revocationLogBucketDeprecated,
2,580✔
193
                )
2,580✔
194
        }
195

196
        // Perform the deletion in one db transaction. This should not cause
197
        // any memory issue as the deletion doesn't load any data from the
198
        // buckets.
199
        return kvdb.Update(db, func(tx kvdb.RwTx) error {
1,728✔
200
                openChanBucket := tx.ReadWriteBucket(openChannelBucket)
864✔
201

864✔
202
                // Exit early if there's no bucket.
864✔
203
                if openChanBucket == nil {
864✔
204
                        return nil
×
205
                }
×
206

207
                // Iterate the buckets to find all the locators.
208
                err := iterateBuckets(openChanBucket, nil, reader)
864✔
209
                if err != nil {
864✔
210
                        return err
×
211
                }
×
212

213
                // Iterate the locators and delete all the old revocation log
214
                // buckets.
215
                for _, l := range locators {
3,444✔
216
                        err := remover(openChanBucket, l)
2,580✔
217
                        // If the bucket doesn't exist, we can exit safety.
2,580✔
218
                        if err != nil && err != kvdb.ErrBucketNotFound {
2,580✔
219
                                return err
×
220
                        }
×
221
                }
222

223
                return nil
864✔
224
        }, func() {})
864✔
225
}
226

227
// writeRevocationLogs unwraps the entryMap and writes the new revocation logs.
228
func writeRevocationLogs(openChanBucket kvdb.RwBucket,
229
        entryMap logEntries) error {
756✔
230

756✔
231
        for locator, logs := range entryMap {
2,358✔
232
                // Find the channel bucket.
1,602✔
233
                chanBucket, err := locator.locateChanBucket(openChanBucket)
1,602✔
234
                if err != nil {
1,602✔
235
                        return fmt.Errorf("locateChanBucket err: %w", err)
×
236
                }
×
237

238
                // Create the new log bucket.
239
                logBucket, err := chanBucket.CreateBucketIfNotExists(
1,602✔
240
                        revocationLogBucket,
1,602✔
241
                )
1,602✔
242
                if err != nil {
1,602✔
243
                        return fmt.Errorf("create log bucket err: %w", err)
×
244
                }
×
245

246
                // Write the new logs.
247
                for _, entry := range logs {
4,428✔
248
                        var b bytes.Buffer
2,826✔
249
                        err := serializeRevocationLog(&b, entry.log)
2,826✔
250
                        if err != nil {
2,826✔
251
                                return err
×
252
                        }
×
253

254
                        logEntrykey := mig24.MakeLogKey(entry.commitHeight)
2,826✔
255
                        err = logBucket.Put(logEntrykey[:], b.Bytes())
2,826✔
256
                        if err != nil {
2,826✔
257
                                return fmt.Errorf("putRevocationLog err: %w",
×
258
                                        err)
×
259
                        }
×
260
                }
261
        }
262

263
        return nil
756✔
264
}
265

266
// logMigrationStat reads the buckets to provide stats over current migration
267
// progress. The returned values are the numbers of total records and already
268
// migrated records.
269
func logMigrationStat(db kvdb.Backend) (uint64, uint64, error) {
2,520✔
270
        var (
2,520✔
271
                err error
2,520✔
272

2,520✔
273
                // total is the number of total records.
2,520✔
274
                total uint64
2,520✔
275

2,520✔
276
                // unmigrated is the number of unmigrated records.
2,520✔
277
                unmigrated uint64
2,520✔
278
        )
2,520✔
279

2,520✔
280
        err = kvdb.Update(db, func(tx kvdb.RwTx) error {
5,040✔
281
                total, unmigrated, err = fetchLogStats(tx)
2,520✔
282
                return err
2,520✔
283
        }, func() {})
5,040✔
284

285
        log.Debugf("Total logs=%d, unmigrated=%d", total, unmigrated)
2,520✔
286
        return total, total - unmigrated, err
2,520✔
287
}
288

289
// fetchLogStats iterates all the chan buckets to provide stats about the logs.
290
// The returned values are num of total records, and num of un-migrated
291
// records.
292
func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, error) {
2,520✔
293
        var (
2,520✔
294
                total           uint64
2,520✔
295
                totalUnmigrated uint64
2,520✔
296
        )
2,520✔
297

2,520✔
298
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
2,520✔
299

2,520✔
300
        // If no bucket is found, we can exit early.
2,520✔
301
        if openChanBucket == nil {
2,520✔
302
                return 0, 0, fmt.Errorf("root bucket not found")
×
303
        }
×
304

305
        // counter is a helper closure used to count the number of records
306
        // based on the given bucket.
307
        counter := func(chanBucket kvdb.RwBucket, bucket []byte) uint64 {
11,928✔
308
                // Read the sub-bucket level 4.
9,408✔
309
                logBucket := chanBucket.NestedReadBucket(bucket)
9,408✔
310

9,408✔
311
                // Exit early if we don't have the bucket.
9,408✔
312
                if logBucket == nil {
11,442✔
313
                        return 0
2,034✔
314
                }
2,034✔
315

316
                // Jump to the end of the cursor.
317
                key, _ := logBucket.ReadCursor().Last()
7,374✔
318

7,374✔
319
                // Since the CommitHeight is a zero-based monotonically
7,374✔
320
                // increased index, its value plus one reflects the total
7,374✔
321
                // records under this chan bucket.
7,374✔
322
                lastHeight := binary.BigEndian.Uint64(key) + 1
7,374✔
323

7,374✔
324
                return lastHeight
7,374✔
325
        }
326

327
        // countTotal is a callback function used to count the total number of
328
        // records.
329
        countTotal := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
10,044✔
330
                total += counter(chanBucket, revocationLogBucketDeprecated)
7,524✔
331
                return nil
7,524✔
332
        }
7,524✔
333

334
        // countUnmigrated is a callback function used to count the total
335
        // number of un-migrated records.
336
        countUnmigrated := func(chanBucket kvdb.RwBucket,
2,520✔
337
                l *updateLocator) error {
4,404✔
338

1,884✔
339
                totalUnmigrated += counter(
1,884✔
340
                        chanBucket, revocationLogBucketDeprecated,
1,884✔
341
                )
1,884✔
342
                return nil
1,884✔
343
        }
1,884✔
344

345
        // Locate the next migration height.
346
        locator, err := locateNextUpdateNum(openChanBucket)
2,520✔
347
        if err != nil {
2,520✔
348
                return 0, 0, fmt.Errorf("locator got error: %w", err)
×
349
        }
×
350

351
        // If the returned locator is not nil, we still have un-migrated
352
        // records so we need to count them. Otherwise we've done migrating the
353
        // logs.
354
        if locator != nil {
3,276✔
355
                err = iterateBuckets(openChanBucket, locator, countUnmigrated)
756✔
356
                if err != nil {
756✔
357
                        return 0, 0, err
×
358
                }
×
359
        }
360

361
        // Count the total number of records by supplying a nil locator.
362
        err = iterateBuckets(openChanBucket, nil, countTotal)
2,520✔
363
        if err != nil {
2,520✔
364
                return 0, 0, err
×
365
        }
×
366

367
        return total, totalUnmigrated, err
2,520✔
368
}
369

370
// logEntry houses the info needed to write a new revocation log.
371
type logEntry struct {
372
        log          *RevocationLog
373
        commitHeight uint64
374
        ourIndex     uint32
375
        theirIndex   uint32
376
        locator      *updateLocator
377
}
378

379
// logEntries maps a bucket locator to a list of entries under that bucket.
380
type logEntries map[*updateLocator][]*logEntry
381

382
// result is made of two channels that's used to send back the constructed new
383
// revocation log or an error.
384
type result struct {
385
        newLog  chan *logEntry
386
        errChan chan error
387
}
388

389
// readOldRevocationLogs finds a list of old revocation logs and converts them
390
// into the new revocation logs.
391
func readOldRevocationLogs(openChanBucket kvdb.RwBucket,
392
        locator *updateLocator, cfg MigrateRevLogConfig) (logEntries, error) {
756✔
393

756✔
394
        entries := make(logEntries)
756✔
395
        results := make([]*result, 0)
756✔
396

756✔
397
        var wg sync.WaitGroup
756✔
398

756✔
399
        // collectLogs is a helper closure that reads all newly created
756✔
400
        // revocation logs sent over the result channels.
756✔
401
        //
756✔
402
        // NOTE: the order of the logs cannot be guaranteed, which is fine as
756✔
403
        // boltdb will take care of the orders when saving them.
756✔
404
        collectLogs := func() error {
1,512✔
405
                wg.Wait()
756✔
406

756✔
407
                for _, r := range results {
3,582✔
408
                        select {
2,826✔
409
                        case entry := <-r.newLog:
2,826✔
410
                                entries[entry.locator] = append(
2,826✔
411
                                        entries[entry.locator], entry,
2,826✔
412
                                )
2,826✔
413

414
                        case err := <-r.errChan:
×
415
                                return err
×
416
                        }
417
                }
418

419
                return nil
756✔
420
        }
421

422
        // createLog is a helper closure that constructs a new revocation log.
423
        //
424
        // NOTE: used as a goroutine.
425
        createLog := func(chanState *mig26.OpenChannel,
756✔
426
                c mig.ChannelCommitment, l *updateLocator, r *result) {
3,582✔
427

2,826✔
428
                defer wg.Done()
2,826✔
429

2,826✔
430
                // Find the output indexes.
2,826✔
431
                ourIndex, theirIndex, err := findOutputIndexes(chanState, &c)
2,826✔
432
                if err != nil {
2,826✔
433
                        r.errChan <- err
×
434
                }
×
435

436
                // Convert the old logs into the new logs. We do this early in
437
                // the read tx so the old large revocation log can be set to
438
                // nil here so save us some memory space.
439
                newLog, err := convertRevocationLog(
2,826✔
440
                        &c, ourIndex, theirIndex, cfg.GetNoAmountData(),
2,826✔
441
                )
2,826✔
442
                if err != nil {
2,826✔
443
                        r.errChan <- err
×
444
                }
×
445
                // Create the entry that will be used to create the new log.
446
                entry := &logEntry{
2,826✔
447
                        log:          newLog,
2,826✔
448
                        commitHeight: c.CommitHeight,
2,826✔
449
                        ourIndex:     ourIndex,
2,826✔
450
                        theirIndex:   theirIndex,
2,826✔
451
                        locator:      l,
2,826✔
452
                }
2,826✔
453

2,826✔
454
                r.newLog <- entry
2,826✔
455
        }
456

457
        // innerCb is the stepping function used when iterating the old log
458
        // bucket.
459
        innerCb := func(chanState *mig26.OpenChannel, l *updateLocator,
756✔
460
                _, v []byte) error {
3,582✔
461

2,826✔
462
                reader := bytes.NewReader(v)
2,826✔
463
                c, err := mig.DeserializeChanCommit(reader)
2,826✔
464
                if err != nil {
2,826✔
465
                        return err
×
466
                }
×
467

468
                r := &result{
2,826✔
469
                        newLog:  make(chan *logEntry, 1),
2,826✔
470
                        errChan: make(chan error, 1),
2,826✔
471
                }
2,826✔
472
                results = append(results, r)
2,826✔
473

2,826✔
474
                // We perform the log creation in a goroutine as it takes some
2,826✔
475
                // time to compute and find output indexes.
2,826✔
476
                wg.Add(1)
2,826✔
477
                go createLog(chanState, c, l, r)
2,826✔
478

2,826✔
479
                // Check the records read so far and signals exit when we've
2,826✔
480
                // reached our memory cap.
2,826✔
481
                if len(results) >= recordsPerTx {
2,826✔
482
                        return errExit
×
483
                }
×
484

485
                return nil
2,826✔
486
        }
487

488
        // cb is the callback function to be used when iterating the buckets.
489
        cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
2,640✔
490
                // Read the open channel.
1,884✔
491
                c := &mig26.OpenChannel{}
1,884✔
492
                err := mig26.FetchChanInfo(chanBucket, c, false)
1,884✔
493
                if err != nil {
1,884✔
494
                        return fmt.Errorf("unable to fetch chan info: %w", err)
×
495
                }
×
496

497
                err = fetchChanRevocationState(chanBucket, c)
1,884✔
498
                if err != nil {
1,884✔
499
                        return fmt.Errorf("unable to fetch revocation "+
×
500
                                "state: %v", err)
×
501
                }
×
502

503
                // Read the sub-bucket level 4.
504
                logBucket := chanBucket.NestedReadBucket(
1,884✔
505
                        revocationLogBucketDeprecated,
1,884✔
506
                )
1,884✔
507
                // Exit early if we don't have the old bucket.
1,884✔
508
                if logBucket == nil {
2,166✔
509
                        return nil
282✔
510
                }
282✔
511

512
                // Init the map key when needed.
513
                _, ok := entries[l]
1,602✔
514
                if !ok {
3,204✔
515
                        entries[l] = make([]*logEntry, 0, recordsPerTx)
1,602✔
516
                }
1,602✔
517

518
                return iterator(
1,602✔
519
                        logBucket, locator.nextHeight,
1,602✔
520
                        func(k, v []byte) error {
4,428✔
521
                                // Reset the nextHeight for following chan
2,826✔
522
                                // buckets.
2,826✔
523
                                locator.nextHeight = nil
2,826✔
524
                                return innerCb(c, l, k, v)
2,826✔
525
                        },
2,826✔
526
                )
527
        }
528

529
        err := iterateBuckets(openChanBucket, locator, cb)
756✔
530
        // If there's an error and it's not exit signal, we won't collect the
756✔
531
        // logs from the result channels.
756✔
532
        if err != nil && err != errExit {
756✔
533
                return nil, err
×
534
        }
×
535

536
        // Otherwise, collect the logs.
537
        err = collectLogs()
756✔
538

756✔
539
        return entries, err
756✔
540
}
541

542
// convertRevocationLog uses the fields `CommitTx` and `Htlcs` from a
543
// ChannelCommitment to construct a revocation log entry.
544
func convertRevocationLog(commit *mig.ChannelCommitment,
545
        ourOutputIndex, theirOutputIndex uint32,
546
        noAmtData bool) (*RevocationLog, error) {
2,826✔
547

2,826✔
548
        // Sanity check that the output indexes can be safely converted.
2,826✔
549
        if ourOutputIndex > math.MaxUint16 {
2,826✔
550
                return nil, ErrOutputIndexTooBig
×
551
        }
×
552
        if theirOutputIndex > math.MaxUint16 {
2,826✔
553
                return nil, ErrOutputIndexTooBig
×
554
        }
×
555

556
        rl := &RevocationLog{
2,826✔
557
                OurOutputIndex:   uint16(ourOutputIndex),
2,826✔
558
                TheirOutputIndex: uint16(theirOutputIndex),
2,826✔
559
                CommitTxHash:     commit.CommitTx.TxHash(),
2,826✔
560
                HTLCEntries:      make([]*HTLCEntry, 0, len(commit.Htlcs)),
2,826✔
561
        }
2,826✔
562

2,826✔
563
        if !noAmtData {
2,826✔
UNCOV
564
                rl.TheirBalance = &commit.RemoteBalance
×
UNCOV
565
                rl.OurBalance = &commit.LocalBalance
×
UNCOV
566
        }
×
567

568
        for _, htlc := range commit.Htlcs {
4,428✔
569
                // Skip dust HTLCs.
1,602✔
570
                if htlc.OutputIndex < 0 {
1,602✔
571
                        continue
×
572
                }
573

574
                // Sanity check that the output indexes can be safely
575
                // converted.
576
                if htlc.OutputIndex > math.MaxUint16 {
1,602✔
577
                        return nil, ErrOutputIndexTooBig
×
578
                }
×
579

580
                entry := &HTLCEntry{
1,602✔
581
                        RHash:         htlc.RHash,
1,602✔
582
                        RefundTimeout: htlc.RefundTimeout,
1,602✔
583
                        Incoming:      htlc.Incoming,
1,602✔
584
                        OutputIndex:   uint16(htlc.OutputIndex),
1,602✔
585
                        Amt:           htlc.Amt.ToSatoshis(),
1,602✔
586
                }
1,602✔
587
                rl.HTLCEntries = append(rl.HTLCEntries, entry)
1,602✔
588
        }
589

590
        return rl, nil
2,826✔
591
}
592

593
// validateMigration checks that the data saved in the new buckets match those
594
// saved in the old buckets. It does so by checking the last keys saved in both
595
// buckets can match, given the assumption that the `CommitHeight` is
596
// monotonically increased value so the last key represents the total number of
597
// records saved.
598
func validateMigration(tx kvdb.RwTx) error {
868✔
599
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
868✔
600

868✔
601
        // If no bucket is found, we can exit early.
868✔
602
        if openChanBucket == nil {
868✔
603
                return nil
×
604
        }
×
605

606
        // exitWithErr is a helper closure that prepends an error message with
607
        // the locator info.
608
        exitWithErr := func(l *updateLocator, msg string) error {
870✔
609
                return fmt.Errorf("unmatched records found under <nodePub=%x"+
2✔
610
                        ", chainHash=%x, fundingOutpoint=%x>: %v", l.nodePub,
2✔
611
                        l.chainHash, l.fundingOutpoint, msg)
2✔
612
        }
2✔
613

614
        // cb is the callback function to be used when iterating the buckets.
615
        cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
3,452✔
616
                // Read both the old and new revocation log buckets.
2,584✔
617
                oldBucket := chanBucket.NestedReadBucket(
2,584✔
618
                        revocationLogBucketDeprecated,
2,584✔
619
                )
2,584✔
620
                newBucket := chanBucket.NestedReadBucket(revocationLogBucket)
2,584✔
621

2,584✔
622
                // Exit early if the old bucket is nil.
2,584✔
623
                //
2,584✔
624
                // NOTE: the new bucket may not be nil here as new logs might
2,584✔
625
                // have been created using lnd@v0.15.0.
2,584✔
626
                if oldBucket == nil {
3,184✔
627
                        return nil
600✔
628
                }
600✔
629

630
                // Return an error if the expected new bucket cannot be found.
631
                if newBucket == nil {
1,984✔
632
                        return exitWithErr(l, "expected new bucket")
×
633
                }
×
634

635
                // Acquire the cursors.
636
                oldCursor := oldBucket.ReadCursor()
1,984✔
637
                newCursor := newBucket.ReadCursor()
1,984✔
638

1,984✔
639
                // Jump to the end of the cursors to do a quick check.
1,984✔
640
                newKey, _ := oldCursor.Last()
1,984✔
641
                oldKey, _ := newCursor.Last()
1,984✔
642

1,984✔
643
                // We expected the CommitHeights to be matched for nodes prior
1,984✔
644
                // to v0.15.0.
1,984✔
645
                if bytes.Equal(newKey, oldKey) {
3,965✔
646
                        return nil
1,981✔
647
                }
1,981✔
648

649
                // If the keys do not match, it's likely the node is running
650
                // v0.15.0 and have new logs created. In this case, we will
651
                // validate that every record in the old bucket can be found in
652
                // the new bucket.
653
                oldKey, _ = oldCursor.First()
3✔
654

3✔
655
                for {
9✔
656
                        // Try to locate the old key in the new bucket and we
6✔
657
                        // expect it to be found.
6✔
658
                        newKey, _ := newCursor.Seek(oldKey)
6✔
659

6✔
660
                        // If the old key is not found in the new bucket,
6✔
661
                        // return an error.
6✔
662
                        //
6✔
663
                        // NOTE: because Seek will return the next key when the
6✔
664
                        // passed key cannot be found, we need to compare the
6✔
665
                        // keys to deicde whether the old key is found or not.
6✔
666
                        if !bytes.Equal(newKey, oldKey) {
8✔
667
                                errMsg := fmt.Sprintf("old bucket has "+
2✔
668
                                        "CommitHeight=%v cannot be found in "+
2✔
669
                                        "new bucket", oldKey)
2✔
670
                                return exitWithErr(l, errMsg)
2✔
671
                        }
2✔
672

673
                        // Otherwise, keep iterating the old bucket.
674
                        oldKey, _ = oldCursor.Next()
4✔
675

4✔
676
                        // If we've done iterating, all keys have been matched
4✔
677
                        // and we can safely exit.
4✔
678
                        if oldKey == nil {
5✔
679
                                return nil
1✔
680
                        }
1✔
681
                }
682
        }
683

684
        return iterateBuckets(openChanBucket, nil, cb)
868✔
685
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc