• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13157733617

05 Feb 2025 12:49PM UTC coverage: 57.712% (-1.1%) from 58.82%
13157733617

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19472 existing lines in 252 files now uncovered.

103634 of 179570 relevant lines covered (57.71%)

24840.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.3
/routing/missioncontrol_store.go
1
package routing
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "encoding/binary"
7
        "fmt"
8
        "io"
9
        "sync"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/kvdb"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/tlv"
15
)
16

17
var (
18
        // resultsKey is the fixed key under which the attempt results are
19
        // stored.
20
        resultsKey = []byte("missioncontrol-results")
21

22
        // Big endian is the preferred byte order, due to cursor scans over
23
        // integer keys iterating in order.
24
        byteOrder = binary.BigEndian
25
)
26

27
// missionControlDB is an interface that defines the database methods that a
28
// single missionControlStore has access to. It allows the missionControlStore
29
// to be unaware of the overall DB structure and restricts its access to the DB
30
// by only providing it the bucket that it needs to care about.
31
type missionControlDB interface {
32
        // update can be used to perform reads and writes on the given bucket.
33
        update(f func(bkt kvdb.RwBucket) error, reset func()) error
34

35
        // view can be used to perform reads on the given bucket.
36
        view(f func(bkt kvdb.RBucket) error, reset func()) error
37

38
        // purge will delete all the contents in this store.
39
        purge() error
40
}
41

42
// missionControlStore is a bolt db based implementation of a mission control
43
// store. It stores the raw payment attempt data from which the internal mission
44
// controls state can be rederived on startup. This allows the mission control
45
// internal data structure to be changed without requiring a database migration.
46
// Also changes to mission control parameters can be applied to historical data.
47
// Finally, it enables importing raw data from an external source.
48
type missionControlStore struct {
49
        done chan struct{}
50
        wg   sync.WaitGroup
51
        db   missionControlDB
52

53
        // TODO(yy): Remove the usage of sync.Cond - we are better off using
54
        // channes than a Cond as suggested in the official godoc.
55
        //
56
        // queueCond is signalled when items are put into the queue.
57
        queueCond *sync.Cond
58

59
        // queue stores all pending payment results not yet added to the store.
60
        // Access is protected by the queueCond.L mutex.
61
        queue *list.List
62

63
        // keys holds the stored MC store item keys in the order of storage.
64
        // We use this list when adding/deleting items from the database to
65
        // avoid cursor use which may be slow in the remote DB case.
66
        keys *list.List
67

68
        // keysMap holds the stored MC store item keys. We use this map to check
69
        // if a new payment result has already been stored.
70
        keysMap map[string]struct{}
71

72
        // maxRecords is the maximum amount of records we will store in the db.
73
        maxRecords int
74

75
        // flushInterval is the configured interval we use to store new results
76
        // and delete outdated ones from the db.
77
        flushInterval time.Duration
78
}
79

80
func newMissionControlStore(db missionControlDB, maxRecords int,
81
        flushInterval time.Duration) (*missionControlStore, error) {
37✔
82

37✔
83
        var (
37✔
84
                keys    *list.List
37✔
85
                keysMap map[string]struct{}
37✔
86
        )
37✔
87

37✔
88
        // Create buckets if not yet existing.
37✔
89
        err := db.update(func(resultsBucket kvdb.RwBucket) error {
74✔
90
                // Collect all keys to be able to quickly calculate the
37✔
91
                // difference when updating the DB state.
37✔
92
                c := resultsBucket.ReadCursor()
37✔
93
                for k, _ := c.First(); k != nil; k, _ = c.Next() {
45✔
94
                        keys.PushBack(string(k))
8✔
95
                        keysMap[string(k)] = struct{}{}
8✔
96
                }
8✔
97

98
                return nil
37✔
99
        }, func() {
37✔
100
                keys = list.New()
37✔
101
                keysMap = make(map[string]struct{})
37✔
102
        })
37✔
103
        if err != nil {
37✔
104
                return nil, err
×
105
        }
×
106

107
        log.Infof("Loaded %d mission control entries", len(keysMap))
37✔
108

37✔
109
        return &missionControlStore{
37✔
110
                done:          make(chan struct{}),
37✔
111
                db:            db,
37✔
112
                queueCond:     sync.NewCond(&sync.Mutex{}),
37✔
113
                queue:         list.New(),
37✔
114
                keys:          keys,
37✔
115
                keysMap:       keysMap,
37✔
116
                maxRecords:    maxRecords,
37✔
117
                flushInterval: flushInterval,
37✔
118
        }, nil
37✔
119
}
120

121
// clear removes all results from the db.
122
func (b *missionControlStore) clear() error {
3✔
123
        b.queueCond.L.Lock()
3✔
124
        defer b.queueCond.L.Unlock()
3✔
125

3✔
126
        if err := b.db.purge(); err != nil {
3✔
127
                return err
×
128
        }
×
129

130
        b.queue = list.New()
3✔
131

3✔
132
        return nil
3✔
133
}
134

135
// fetchAll returns all results currently stored in the database.
136
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
47✔
137
        var results []*paymentResult
47✔
138

47✔
139
        err := b.db.view(func(resultBucket kvdb.RBucket) error {
94✔
140
                results = make([]*paymentResult, 0)
47✔
141

47✔
142
                return resultBucket.ForEach(func(k, v []byte) error {
70✔
143
                        result, err := deserializeResult(k, v)
23✔
144
                        if err != nil {
23✔
145
                                return err
×
146
                        }
×
147

148
                        results = append(results, result)
23✔
149

23✔
150
                        return nil
23✔
151
                })
152

153
        }, func() {
47✔
154
                results = nil
47✔
155
        })
47✔
156
        if err != nil {
47✔
157
                return nil, err
×
158
        }
×
159

160
        return results, nil
47✔
161
}
162

163
// serializeResult serializes a payment result and returns a key and value byte
164
// slice to insert into the bucket.
165
func serializeResult(rp *paymentResult) ([]byte, []byte, error) {
13✔
166
        recordProducers := []tlv.RecordProducer{
13✔
167
                &rp.timeFwd,
13✔
168
                &rp.timeReply,
13✔
169
                &rp.route,
13✔
170
        }
13✔
171

13✔
172
        rp.failure.WhenSome(
13✔
173
                func(failure tlv.RecordT[tlv.TlvType3, paymentFailure]) {
24✔
174
                        recordProducers = append(recordProducers, &failure)
11✔
175
                },
11✔
176
        )
177

178
        // Compose key that identifies this result.
179
        key := getResultKey(rp)
13✔
180

13✔
181
        var buff bytes.Buffer
13✔
182
        err := lnwire.EncodeRecordsTo(
13✔
183
                &buff, lnwire.ProduceRecordsSorted(recordProducers...),
13✔
184
        )
13✔
185
        if err != nil {
13✔
186
                return nil, nil, err
×
187
        }
×
188

189
        return key, buff.Bytes(), err
13✔
190
}
191

192
// deserializeResult deserializes a payment result.
193
func deserializeResult(k, v []byte) (*paymentResult, error) {
23✔
194
        // Parse payment id.
23✔
195
        result := paymentResult{
23✔
196
                id: byteOrder.Uint64(k[8:]),
23✔
197
        }
23✔
198

23✔
199
        failure := tlv.ZeroRecordT[tlv.TlvType3, paymentFailure]()
23✔
200
        recordProducers := []tlv.RecordProducer{
23✔
201
                &result.timeFwd,
23✔
202
                &result.timeReply,
23✔
203
                &result.route,
23✔
204
                &failure,
23✔
205
        }
23✔
206

23✔
207
        r := bytes.NewReader(v)
23✔
208
        typeMap, err := lnwire.DecodeRecords(
23✔
209
                r, lnwire.ProduceRecordsSorted(recordProducers...)...,
23✔
210
        )
23✔
211
        if err != nil {
23✔
212
                return nil, err
×
213
        }
×
214

215
        if _, ok := typeMap[result.failure.TlvType()]; ok {
44✔
216
                result.failure = tlv.SomeRecordT(failure)
21✔
217
        }
21✔
218

219
        return &result, nil
23✔
220
}
221

222
// serializeRoute serializes a mcRoute and writes the resulting bytes to the
223
// given io.Writer.
224
func serializeRoute(w io.Writer, r *mcRoute) error {
26✔
225
        records := lnwire.ProduceRecordsSorted(
26✔
226
                &r.sourcePubKey,
26✔
227
                &r.totalAmount,
26✔
228
                &r.hops,
26✔
229
        )
26✔
230

26✔
231
        return lnwire.EncodeRecordsTo(w, records)
26✔
232
}
26✔
233

234
// deserializeRoute deserializes the mcRoute from the given io.Reader.
235
func deserializeRoute(r io.Reader) (*mcRoute, error) {
23✔
236
        var rt mcRoute
23✔
237
        records := lnwire.ProduceRecordsSorted(
23✔
238
                &rt.sourcePubKey,
23✔
239
                &rt.totalAmount,
23✔
240
                &rt.hops,
23✔
241
        )
23✔
242

23✔
243
        _, err := lnwire.DecodeRecords(r, records...)
23✔
244
        if err != nil {
23✔
245
                return nil, err
×
246
        }
×
247

248
        return &rt, nil
23✔
249
}
250

251
// deserializeHop deserializes the mcHop from the given io.Reader.
252
func deserializeHop(r io.Reader) (*mcHop, error) {
27✔
253
        var (
27✔
254
                h        mcHop
27✔
255
                blinding = tlv.ZeroRecordT[tlv.TlvType3, lnwire.TrueBoolean]()
27✔
256
                custom   = tlv.ZeroRecordT[tlv.TlvType4, lnwire.TrueBoolean]()
27✔
257
        )
27✔
258
        records := lnwire.ProduceRecordsSorted(
27✔
259
                &h.channelID,
27✔
260
                &h.pubKeyBytes,
27✔
261
                &h.amtToFwd,
27✔
262
                &blinding,
27✔
263
                &custom,
27✔
264
        )
27✔
265

27✔
266
        typeMap, err := lnwire.DecodeRecords(r, records...)
27✔
267
        if err != nil {
27✔
268
                return nil, err
×
269
        }
×
270

271
        if _, ok := typeMap[h.hasBlindingPoint.TlvType()]; ok {
27✔
272
                h.hasBlindingPoint = tlv.SomeRecordT(blinding)
×
273
        }
×
274

275
        if _, ok := typeMap[h.hasCustomRecords.TlvType()]; ok {
46✔
276
                h.hasCustomRecords = tlv.SomeRecordT(custom)
19✔
277
        }
19✔
278

279
        return &h, nil
27✔
280
}
281

282
// serializeHop serializes a mcHop and writes the resulting bytes to the given
283
// io.Writer.
284
func serializeHop(w io.Writer, h *mcHop) error {
68✔
285
        recordProducers := []tlv.RecordProducer{
68✔
286
                &h.channelID,
68✔
287
                &h.pubKeyBytes,
68✔
288
                &h.amtToFwd,
68✔
289
        }
68✔
290

68✔
291
        h.hasBlindingPoint.WhenSome(func(
68✔
292
                hasBlinding tlv.RecordT[tlv.TlvType3, lnwire.TrueBoolean]) {
68✔
UNCOV
293

×
UNCOV
294
                recordProducers = append(recordProducers, &hasBlinding)
×
UNCOV
295
        })
×
296

297
        h.hasCustomRecords.WhenSome(func(
68✔
298
                hasCustom tlv.RecordT[tlv.TlvType4, lnwire.TrueBoolean]) {
104✔
299

36✔
300
                recordProducers = append(recordProducers, &hasCustom)
36✔
301
        })
36✔
302

303
        return lnwire.EncodeRecordsTo(
68✔
304
                w, lnwire.ProduceRecordsSorted(recordProducers...),
68✔
305
        )
68✔
306
}
307

308
// AddResult adds a new result to the db.
309
func (b *missionControlStore) AddResult(rp *paymentResult) {
78✔
310
        b.queueCond.L.Lock()
78✔
311
        b.queue.PushBack(rp)
78✔
312
        b.queueCond.L.Unlock()
78✔
313

78✔
314
        b.queueCond.Signal()
78✔
315
}
78✔
316

317
// stop stops the store ticker goroutine.
318
func (b *missionControlStore) stop() {
2✔
319
        close(b.done)
2✔
320

2✔
321
        b.queueCond.Signal()
2✔
322

2✔
323
        b.wg.Wait()
2✔
324
}
2✔
325

326
// run runs the MC store ticker goroutine.
327
func (b *missionControlStore) run() {
2✔
328
        b.wg.Add(1)
2✔
329

2✔
330
        go func() {
4✔
331
                defer b.wg.Done()
2✔
332

2✔
333
                timer := time.NewTimer(b.flushInterval)
2✔
334

2✔
335
                // Immediately stop the timer. It will be started once new
2✔
336
                // items are added to the store. As the doc for time.Timer
2✔
337
                // states, every call to Stop() done on a timer that is not
2✔
338
                // known to have been fired needs to be checked and the timer's
2✔
339
                // channel needs to be drained appropriately. This could happen
2✔
340
                // if the flushInterval is very small (e.g. 1 nanosecond).
2✔
341
                if !timer.Stop() {
2✔
342
                        select {
×
343
                        case <-timer.C:
×
344
                        case <-b.done:
×
345
                                log.Debugf("Stopping mission control store")
×
346
                        }
347
                }
348

349
                for {
7✔
350
                        // Wait for the queue to not be empty.
5✔
351
                        b.queueCond.L.Lock()
5✔
352
                        for b.queue.Front() == nil {
12✔
353
                                // To make sure we can properly stop, we must
7✔
354
                                // read the `done` channel first before
7✔
355
                                // attempting to call `Wait()`. This is due to
7✔
356
                                // the fact when `Signal` is called before the
7✔
357
                                // `Wait` call, the `Wait` call will block
7✔
358
                                // indefinitely.
7✔
359
                                //
7✔
360
                                // TODO(yy): replace this with channels.
7✔
361
                                select {
7✔
362
                                case <-b.done:
2✔
363
                                        b.queueCond.L.Unlock()
2✔
364

2✔
365
                                        return
2✔
366
                                default:
5✔
367
                                }
368

369
                                b.queueCond.Wait()
5✔
370
                        }
371
                        b.queueCond.L.Unlock()
3✔
372

3✔
373
                        // Restart the timer.
3✔
374
                        timer.Reset(b.flushInterval)
3✔
375

3✔
376
                        select {
3✔
377
                        case <-timer.C:
3✔
378
                                if err := b.storeResults(); err != nil {
3✔
379
                                        log.Errorf("Failed to update mission "+
×
380
                                                "control store: %v", err)
×
381
                                }
×
382

UNCOV
383
                        case <-b.done:
×
UNCOV
384
                                // Release the timer's resources.
×
UNCOV
385
                                if !timer.Stop() {
×
386
                                        select {
×
387
                                        case <-timer.C:
×
388
                                        case <-b.done:
×
389
                                                log.Debugf("Mission control " +
×
390
                                                        "store stopped")
×
391
                                        }
392
                                }
UNCOV
393
                                return
×
394
                        }
395
                }
396
        }()
397
}
398

399
// storeResults stores all accumulated results.
400
func (b *missionControlStore) storeResults() error {
9✔
401
        // We copy a reference to the queue and clear the original queue to be
9✔
402
        // able to release the lock.
9✔
403
        b.queueCond.L.Lock()
9✔
404
        l := b.queue
9✔
405

9✔
406
        if l.Len() == 0 {
9✔
407
                b.queueCond.L.Unlock()
×
408

×
409
                return nil
×
410
        }
×
411
        b.queue = list.New()
9✔
412
        b.queueCond.L.Unlock()
9✔
413

9✔
414
        var (
9✔
415
                newKeys    map[string]struct{}
9✔
416
                delKeys    []string
9✔
417
                storeCount int
9✔
418
                pruneCount int
9✔
419
        )
9✔
420

9✔
421
        // Create a deduped list of new entries.
9✔
422
        newKeys = make(map[string]struct{}, l.Len())
9✔
423
        for e := l.Front(); e != nil; e = e.Next() {
22✔
424
                pr, ok := e.Value.(*paymentResult)
13✔
425
                if !ok {
13✔
426
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
427
                                e.Value)
×
428
                }
×
429
                key := string(getResultKey(pr))
13✔
430
                if _, ok := b.keysMap[key]; ok {
13✔
431
                        l.Remove(e)
×
432
                        continue
×
433
                }
434
                if _, ok := newKeys[key]; ok {
14✔
435
                        l.Remove(e)
1✔
436
                        continue
1✔
437
                }
438
                newKeys[key] = struct{}{}
12✔
439
        }
440

441
        // Create a list of entries to delete.
442
        toDelete := b.keys.Len() + len(newKeys) - b.maxRecords
9✔
443
        if b.maxRecords > 0 && toDelete > 0 {
13✔
444
                delKeys = make([]string, 0, toDelete)
4✔
445

4✔
446
                // Delete as many as needed from old keys.
4✔
447
                for e := b.keys.Front(); len(delKeys) < toDelete && e != nil; {
9✔
448
                        key, ok := e.Value.(string)
5✔
449
                        if !ok {
5✔
450
                                return fmt.Errorf("wrong type %T (not string)",
×
451
                                        e.Value)
×
452
                        }
×
453
                        delKeys = append(delKeys, key)
5✔
454
                        e = e.Next()
5✔
455
                }
456

457
                // If more deletions are needed, simply do not add from the
458
                // list of new keys.
459
                for e := l.Front(); len(delKeys) < toDelete && e != nil; {
4✔
460
                        toDelete--
×
461
                        pr, ok := e.Value.(*paymentResult)
×
462
                        if !ok {
×
463
                                return fmt.Errorf("wrong type %T (not "+
×
464
                                        "*paymentResult )", e.Value)
×
465
                        }
×
466
                        key := string(getResultKey(pr))
×
467
                        delete(newKeys, key)
×
468
                        l.Remove(e)
×
469
                        e = l.Front()
×
470
                }
471
        }
472

473
        err := b.db.update(func(bucket kvdb.RwBucket) error {
18✔
474
                for e := l.Front(); e != nil; e = e.Next() {
22✔
475
                        pr, ok := e.Value.(*paymentResult)
13✔
476
                        if !ok {
13✔
477
                                return fmt.Errorf("wrong type %T (not "+
×
478
                                        "*paymentResult)", e.Value)
×
479
                        }
×
480

481
                        // Serialize result into key and value byte slices.
482
                        k, v, err := serializeResult(pr)
13✔
483
                        if err != nil {
13✔
484
                                return err
×
485
                        }
×
486

487
                        // Put into results bucket.
488
                        if err := bucket.Put(k, v); err != nil {
13✔
489
                                return err
×
490
                        }
×
491

492
                        storeCount++
13✔
493
                }
494

495
                // Prune oldest entries.
496
                for _, key := range delKeys {
14✔
497
                        if err := bucket.Delete([]byte(key)); err != nil {
5✔
498
                                return err
×
499
                        }
×
500
                        pruneCount++
5✔
501
                }
502

503
                return nil
9✔
504
        }, func() {
9✔
505
                storeCount, pruneCount = 0, 0
9✔
506
        })
9✔
507

508
        if err != nil {
9✔
509
                return err
×
510
        }
×
511

512
        log.Debugf("Stored mission control results: %d added, %d deleted",
9✔
513
                storeCount, pruneCount)
9✔
514

9✔
515
        // DB Update was successful, update the in-memory cache.
9✔
516
        for _, key := range delKeys {
14✔
517
                delete(b.keysMap, key)
5✔
518
                b.keys.Remove(b.keys.Front())
5✔
519
        }
5✔
520
        for e := l.Front(); e != nil; e = e.Next() {
22✔
521
                pr, ok := e.Value.(*paymentResult)
13✔
522
                if !ok {
13✔
523
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
524
                                e.Value)
×
525
                }
×
526
                key := string(getResultKey(pr))
13✔
527
                b.keys.PushBack(key)
13✔
528
        }
529

530
        return nil
9✔
531
}
532

533
// getResultKey returns a byte slice representing a unique key for this payment
534
// result.
535
func getResultKey(rp *paymentResult) []byte {
39✔
536
        var keyBytes [8 + 8 + 33]byte
39✔
537

39✔
538
        // Identify records by a combination of time, payment id and sender pub
39✔
539
        // key. This allows importing mission control data from an external
39✔
540
        // source without key collisions and keeps the records sorted
39✔
541
        // chronologically.
39✔
542
        byteOrder.PutUint64(keyBytes[:], rp.timeReply.Val)
39✔
543
        byteOrder.PutUint64(keyBytes[8:], rp.id)
39✔
544
        copy(keyBytes[16:], rp.route.Val.sourcePubKey.Val[:])
39✔
545

39✔
546
        return keyBytes[:]
39✔
547
}
39✔
548

549
// failureMessage wraps the lnwire.FailureMessage interface such that we can
550
// apply a Record method and use the failureMessage in a TLV encoded type.
551
type failureMessage struct {
552
        lnwire.FailureMessage
553
}
554

555
// Record returns a TLV record that can be used to encode/decode a list of
556
// failureMessage to/from a TLV stream.
557
func (r *failureMessage) Record() tlv.Record {
65✔
558
        recordSize := func() uint64 {
109✔
559
                var (
44✔
560
                        b   bytes.Buffer
44✔
561
                        buf [8]byte
44✔
562
                )
44✔
563
                if err := encodeFailureMessage(&b, r, &buf); err != nil {
44✔
564
                        panic(err)
×
565
                }
566

567
                return uint64(len(b.Bytes()))
44✔
568
        }
569

570
        return tlv.MakeDynamicRecord(
65✔
571
                0, r, recordSize, encodeFailureMessage, decodeFailureMessage,
65✔
572
        )
65✔
573
}
574

575
func encodeFailureMessage(w io.Writer, val interface{}, _ *[8]byte) error {
88✔
576
        if v, ok := val.(*failureMessage); ok {
176✔
577
                var b bytes.Buffer
88✔
578
                err := lnwire.EncodeFailureMessage(&b, v.FailureMessage, 0)
88✔
579
                if err != nil {
88✔
580
                        return err
×
581
                }
×
582

583
                _, err = w.Write(b.Bytes())
88✔
584

88✔
585
                return err
88✔
586
        }
587

588
        return tlv.NewTypeForEncodingErr(val, "routing.failureMessage")
×
589
}
590

591
func decodeFailureMessage(r io.Reader, val interface{}, _ *[8]byte,
592
        l uint64) error {
21✔
593

21✔
594
        if v, ok := val.(*failureMessage); ok {
42✔
595
                msg, err := lnwire.DecodeFailureMessage(r, 0)
21✔
596
                if err != nil {
21✔
597
                        return err
×
598
                }
×
599

600
                *v = failureMessage{
21✔
601
                        FailureMessage: msg,
21✔
602
                }
21✔
603

21✔
604
                return nil
21✔
605
        }
606

607
        return tlv.NewTypeForDecodingErr(val, "routing.failureMessage", l, l)
×
608
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc