• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13536249039

26 Feb 2025 03:42AM UTC coverage: 57.462% (-1.4%) from 58.835%
13536249039

Pull #8453

github

Roasbeef
peer: update chooseDeliveryScript to gen script if needed

In this commit, we update `chooseDeliveryScript` to generate a new
script if needed. This allows us to fold in a few other lines that
always followed this function into this expanded function.

The tests have been updated accordingly.
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

275 of 1318 new or added lines in 22 files covered. (20.86%)

19521 existing lines in 257 files now uncovered.

103858 of 180741 relevant lines covered (57.46%)

24750.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.3
/routing/missioncontrol_store.go
1
package routing
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "encoding/binary"
7
        "fmt"
8
        "io"
9
        "sync"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/kvdb"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/tlv"
15
)
16

17
var (
18
        // resultsKey is the fixed key under which the attempt results are
19
        // stored.
20
        resultsKey = []byte("missioncontrol-results")
21

22
        // Big endian is the preferred byte order, due to cursor scans over
23
        // integer keys iterating in order.
24
        byteOrder = binary.BigEndian
25
)
26

27
// missionControlDB is an interface that defines the database methods that a
28
// single missionControlStore has access to. It allows the missionControlStore
29
// to be unaware of the overall DB structure and restricts its access to the DB
30
// by only providing it the bucket that it needs to care about.
31
type missionControlDB interface {
32
        // update can be used to perform reads and writes on the given bucket.
33
        update(f func(bkt kvdb.RwBucket) error, reset func()) error
34

35
        // view can be used to perform reads on the given bucket.
36
        view(f func(bkt kvdb.RBucket) error, reset func()) error
37

38
        // purge will delete all the contents in this store.
39
        purge() error
40
}
41

42
// missionControlStore is a bolt db based implementation of a mission control
43
// store. It stores the raw payment attempt data from which the internal mission
44
// controls state can be rederived on startup. This allows the mission control
45
// internal data structure to be changed without requiring a database migration.
46
// Also changes to mission control parameters can be applied to historical data.
47
// Finally, it enables importing raw data from an external source.
48
type missionControlStore struct {
49
        done chan struct{}
50
        wg   sync.WaitGroup
51
        db   missionControlDB
52

53
        // TODO(yy): Remove the usage of sync.Cond - we are better off using
54
        // channes than a Cond as suggested in the official godoc.
55
        //
56
        // queueCond is signalled when items are put into the queue.
57
        queueCond *sync.Cond
58

59
        // queue stores all pending payment results not yet added to the store.
60
        // Access is protected by the queueCond.L mutex.
61
        queue *list.List
62

63
        // keys holds the stored MC store item keys in the order of storage.
64
        // We use this list when adding/deleting items from the database to
65
        // avoid cursor use which may be slow in the remote DB case.
66
        keys *list.List
67

68
        // keysMap holds the stored MC store item keys. We use this map to check
69
        // if a new payment result has already been stored.
70
        keysMap map[string]struct{}
71

72
        // maxRecords is the maximum amount of records we will store in the db.
73
        maxRecords int
74

75
        // flushInterval is the configured interval we use to store new results
76
        // and delete outdated ones from the db.
77
        flushInterval time.Duration
78
}
79

80
func newMissionControlStore(db missionControlDB, maxRecords int,
81
        flushInterval time.Duration) (*missionControlStore, error) {
37✔
82

37✔
83
        var (
37✔
84
                keys    *list.List
37✔
85
                keysMap map[string]struct{}
37✔
86
        )
37✔
87

37✔
88
        // Create buckets if not yet existing.
37✔
89
        err := db.update(func(resultsBucket kvdb.RwBucket) error {
74✔
90
                // Collect all keys to be able to quickly calculate the
37✔
91
                // difference when updating the DB state.
37✔
92
                c := resultsBucket.ReadCursor()
37✔
93
                for k, _ := c.First(); k != nil; k, _ = c.Next() {
45✔
94
                        keys.PushBack(string(k))
8✔
95
                        keysMap[string(k)] = struct{}{}
8✔
96
                }
8✔
97

98
                return nil
37✔
99
        }, func() {
37✔
100
                keys = list.New()
37✔
101
                keysMap = make(map[string]struct{})
37✔
102
        })
37✔
103
        if err != nil {
37✔
104
                return nil, err
×
105
        }
×
106

107
        log.Infof("Loaded %d mission control entries", len(keysMap))
37✔
108

37✔
109
        return &missionControlStore{
37✔
110
                done:          make(chan struct{}),
37✔
111
                db:            db,
37✔
112
                queueCond:     sync.NewCond(&sync.Mutex{}),
37✔
113
                queue:         list.New(),
37✔
114
                keys:          keys,
37✔
115
                keysMap:       keysMap,
37✔
116
                maxRecords:    maxRecords,
37✔
117
                flushInterval: flushInterval,
37✔
118
        }, nil
37✔
119
}
120

121
// clear removes all results from the db.
122
func (b *missionControlStore) clear() error {
3✔
123
        b.queueCond.L.Lock()
3✔
124
        defer b.queueCond.L.Unlock()
3✔
125

3✔
126
        if err := b.db.purge(); err != nil {
3✔
127
                return err
×
128
        }
×
129

130
        b.queue = list.New()
3✔
131

3✔
132
        return nil
3✔
133
}
134

135
// fetchAll returns all results currently stored in the database.
136
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
47✔
137
        var results []*paymentResult
47✔
138

47✔
139
        err := b.db.view(func(resultBucket kvdb.RBucket) error {
94✔
140
                results = make([]*paymentResult, 0)
47✔
141

47✔
142
                return resultBucket.ForEach(func(k, v []byte) error {
70✔
143
                        result, err := deserializeResult(k, v)
23✔
144
                        if err != nil {
23✔
145
                                return err
×
146
                        }
×
147

148
                        results = append(results, result)
23✔
149

23✔
150
                        return nil
23✔
151
                })
152

153
        }, func() {
47✔
154
                results = nil
47✔
155
        })
47✔
156
        if err != nil {
47✔
157
                return nil, err
×
158
        }
×
159

160
        return results, nil
47✔
161
}
162

163
// serializeResult serializes a payment result and returns a key and value byte
164
// slice to insert into the bucket.
165
func serializeResult(rp *paymentResult) ([]byte, []byte, error) {
13✔
166
        recordProducers := []tlv.RecordProducer{
13✔
167
                &rp.timeFwd,
13✔
168
                &rp.timeReply,
13✔
169
                &rp.route,
13✔
170
        }
13✔
171

13✔
172
        rp.failure.WhenSome(
13✔
173
                func(failure tlv.RecordT[tlv.TlvType3, paymentFailure]) {
24✔
174
                        recordProducers = append(recordProducers, &failure)
11✔
175
                },
11✔
176
        )
177

178
        // Compose key that identifies this result.
179
        key := getResultKey(rp)
13✔
180

13✔
181
        var buff bytes.Buffer
13✔
182
        err := lnwire.EncodeRecordsTo(
13✔
183
                &buff, lnwire.ProduceRecordsSorted(recordProducers...),
13✔
184
        )
13✔
185
        if err != nil {
13✔
186
                return nil, nil, err
×
187
        }
×
188

189
        return key, buff.Bytes(), err
13✔
190
}
191

192
// deserializeResult deserializes a payment result.
193
func deserializeResult(k, v []byte) (*paymentResult, error) {
23✔
194
        // Parse payment id.
23✔
195
        result := paymentResult{
23✔
196
                id: byteOrder.Uint64(k[8:]),
23✔
197
        }
23✔
198

23✔
199
        failure := tlv.ZeroRecordT[tlv.TlvType3, paymentFailure]()
23✔
200
        recordProducers := []tlv.RecordProducer{
23✔
201
                &result.timeFwd,
23✔
202
                &result.timeReply,
23✔
203
                &result.route,
23✔
204
                &failure,
23✔
205
        }
23✔
206

23✔
207
        r := bytes.NewReader(v)
23✔
208
        typeMap, err := lnwire.DecodeRecords(
23✔
209
                r, lnwire.ProduceRecordsSorted(recordProducers...)...,
23✔
210
        )
23✔
211
        if err != nil {
23✔
212
                return nil, err
×
213
        }
×
214

215
        if _, ok := typeMap[result.failure.TlvType()]; ok {
44✔
216
                result.failure = tlv.SomeRecordT(failure)
21✔
217
        }
21✔
218

219
        return &result, nil
23✔
220
}
221

222
// serializeRoute serializes a mcRoute and writes the resulting bytes to the
223
// given io.Writer.
224
func serializeRoute(w io.Writer, r *mcRoute) error {
26✔
225
        records := lnwire.ProduceRecordsSorted(
26✔
226
                &r.sourcePubKey,
26✔
227
                &r.totalAmount,
26✔
228
                &r.hops,
26✔
229
        )
26✔
230

26✔
231
        return lnwire.EncodeRecordsTo(w, records)
26✔
232
}
26✔
233

234
// deserializeRoute deserializes the mcRoute from the given io.Reader.
235
func deserializeRoute(r io.Reader) (*mcRoute, error) {
23✔
236
        var rt mcRoute
23✔
237
        records := lnwire.ProduceRecordsSorted(
23✔
238
                &rt.sourcePubKey,
23✔
239
                &rt.totalAmount,
23✔
240
                &rt.hops,
23✔
241
        )
23✔
242

23✔
243
        _, err := lnwire.DecodeRecords(r, records...)
23✔
244
        if err != nil {
23✔
245
                return nil, err
×
246
        }
×
247

248
        return &rt, nil
23✔
249
}
250

251
// deserializeHop deserializes the mcHop from the given io.Reader.
252
func deserializeHop(r io.Reader) (*mcHop, error) {
27✔
253
        var (
27✔
254
                h        mcHop
27✔
255
                blinding = tlv.ZeroRecordT[tlv.TlvType3, lnwire.TrueBoolean]()
27✔
256
                custom   = tlv.ZeroRecordT[tlv.TlvType4, lnwire.TrueBoolean]()
27✔
257
        )
27✔
258
        records := lnwire.ProduceRecordsSorted(
27✔
259
                &h.channelID,
27✔
260
                &h.pubKeyBytes,
27✔
261
                &h.amtToFwd,
27✔
262
                &blinding,
27✔
263
                &custom,
27✔
264
        )
27✔
265

27✔
266
        typeMap, err := lnwire.DecodeRecords(r, records...)
27✔
267
        if err != nil {
27✔
268
                return nil, err
×
269
        }
×
270

271
        if _, ok := typeMap[h.hasBlindingPoint.TlvType()]; ok {
27✔
272
                h.hasBlindingPoint = tlv.SomeRecordT(blinding)
×
273
        }
×
274

275
        if _, ok := typeMap[h.hasCustomRecords.TlvType()]; ok {
46✔
276
                h.hasCustomRecords = tlv.SomeRecordT(custom)
19✔
277
        }
19✔
278

279
        return &h, nil
27✔
280
}
281

282
// serializeHop serializes a mcHop and writes the resulting bytes to the given
283
// io.Writer.
284
func serializeHop(w io.Writer, h *mcHop) error {
68✔
285
        recordProducers := []tlv.RecordProducer{
68✔
286
                &h.channelID,
68✔
287
                &h.pubKeyBytes,
68✔
288
                &h.amtToFwd,
68✔
289
        }
68✔
290

68✔
291
        h.hasBlindingPoint.WhenSome(func(
68✔
292
                hasBlinding tlv.RecordT[tlv.TlvType3, lnwire.TrueBoolean]) {
68✔
UNCOV
293

×
UNCOV
294
                recordProducers = append(recordProducers, &hasBlinding)
×
UNCOV
295
        })
×
296

297
        h.hasCustomRecords.WhenSome(func(
68✔
298
                hasCustom tlv.RecordT[tlv.TlvType4, lnwire.TrueBoolean]) {
104✔
299

36✔
300
                recordProducers = append(recordProducers, &hasCustom)
36✔
301
        })
36✔
302

303
        return lnwire.EncodeRecordsTo(
68✔
304
                w, lnwire.ProduceRecordsSorted(recordProducers...),
68✔
305
        )
68✔
306
}
307

308
// AddResult adds a new result to the db.
309
func (b *missionControlStore) AddResult(rp *paymentResult) {
78✔
310
        b.queueCond.L.Lock()
78✔
311
        b.queue.PushBack(rp)
78✔
312
        b.queueCond.L.Unlock()
78✔
313

78✔
314
        b.queueCond.Signal()
78✔
315
}
78✔
316

317
// stop stops the store ticker goroutine.
318
func (b *missionControlStore) stop() {
2✔
319
        close(b.done)
2✔
320

2✔
321
        b.queueCond.Signal()
2✔
322

2✔
323
        b.wg.Wait()
2✔
324
}
2✔
325

326
// run runs the MC store ticker goroutine.
327
func (b *missionControlStore) run() {
2✔
328
        b.wg.Add(1)
2✔
329

2✔
330
        go func() {
4✔
331
                defer b.wg.Done()
2✔
332

2✔
333
                timer := time.NewTimer(b.flushInterval)
2✔
334

2✔
335
                // Immediately stop the timer. It will be started once new
2✔
336
                // items are added to the store. As the doc for time.Timer
2✔
337
                // states, every call to Stop() done on a timer that is not
2✔
338
                // known to have been fired needs to be checked and the timer's
2✔
339
                // channel needs to be drained appropriately. This could happen
2✔
340
                // if the flushInterval is very small (e.g. 1 nanosecond).
2✔
341
                if !timer.Stop() {
2✔
342
                        select {
×
343
                        case <-timer.C:
×
344
                        case <-b.done:
×
345
                                log.Debugf("Stopping mission control store")
×
346
                        }
347
                }
348

349
                for {
7✔
350
                        // Wait for the queue to not be empty.
5✔
351
                        b.queueCond.L.Lock()
5✔
352
                        for b.queue.Front() == nil {
12✔
353
                                // To make sure we can properly stop, we must
7✔
354
                                // read the `done` channel first before
7✔
355
                                // attempting to call `Wait()`. This is due to
7✔
356
                                // the fact when `Signal` is called before the
7✔
357
                                // `Wait` call, the `Wait` call will block
7✔
358
                                // indefinitely.
7✔
359
                                //
7✔
360
                                // TODO(yy): replace this with channels.
7✔
361
                                select {
7✔
362
                                case <-b.done:
2✔
363
                                        b.queueCond.L.Unlock()
2✔
364

2✔
365
                                        return
2✔
366
                                default:
5✔
367
                                }
368

369
                                b.queueCond.Wait()
5✔
370
                        }
371
                        b.queueCond.L.Unlock()
3✔
372

3✔
373
                        // Restart the timer.
3✔
374
                        timer.Reset(b.flushInterval)
3✔
375

3✔
376
                        select {
3✔
377
                        case <-timer.C:
3✔
378
                                if err := b.storeResults(); err != nil {
3✔
379
                                        log.Errorf("Failed to update mission "+
×
380
                                                "control store: %v", err)
×
381
                                }
×
382

UNCOV
383
                        case <-b.done:
×
UNCOV
384
                                // Release the timer's resources.
×
UNCOV
385
                                if !timer.Stop() {
×
386
                                        select {
×
387
                                        case <-timer.C:
×
388
                                        case <-b.done:
×
389
                                                log.Debugf("Mission control " +
×
390
                                                        "store stopped")
×
391
                                        }
392
                                }
UNCOV
393
                                return
×
394
                        }
395
                }
396
        }()
397
}
398

399
// storeResults stores all accumulated results.
400
func (b *missionControlStore) storeResults() error {
9✔
401
        // We copy a reference to the queue and clear the original queue to be
9✔
402
        // able to release the lock.
9✔
403
        b.queueCond.L.Lock()
9✔
404
        l := b.queue
9✔
405

9✔
406
        if l.Len() == 0 {
9✔
407
                b.queueCond.L.Unlock()
×
408

×
409
                return nil
×
410
        }
×
411
        b.queue = list.New()
9✔
412
        b.queueCond.L.Unlock()
9✔
413

9✔
414
        var (
9✔
415
                newKeys    map[string]struct{}
9✔
416
                delKeys    []string
9✔
417
                storeCount int
9✔
418
                pruneCount int
9✔
419
        )
9✔
420

9✔
421
        // Create a deduped list of new entries.
9✔
422
        newKeys = make(map[string]struct{}, l.Len())
9✔
423
        for e := l.Front(); e != nil; e = e.Next() {
22✔
424
                pr, ok := e.Value.(*paymentResult)
13✔
425
                if !ok {
13✔
426
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
427
                                e.Value)
×
428
                }
×
429
                key := string(getResultKey(pr))
13✔
430
                if _, ok := b.keysMap[key]; ok {
13✔
431
                        l.Remove(e)
×
432
                        continue
×
433
                }
434
                if _, ok := newKeys[key]; ok {
14✔
435
                        l.Remove(e)
1✔
436
                        continue
1✔
437
                }
438
                newKeys[key] = struct{}{}
12✔
439
        }
440

441
        // Create a list of entries to delete.
442
        toDelete := b.keys.Len() + len(newKeys) - b.maxRecords
9✔
443
        if b.maxRecords > 0 && toDelete > 0 {
13✔
444
                delKeys = make([]string, 0, toDelete)
4✔
445

4✔
446
                // Delete as many as needed from old keys.
4✔
447
                for e := b.keys.Front(); len(delKeys) < toDelete && e != nil; {
9✔
448
                        key, ok := e.Value.(string)
5✔
449
                        if !ok {
5✔
450
                                return fmt.Errorf("wrong type %T (not string)",
×
451
                                        e.Value)
×
452
                        }
×
453
                        delKeys = append(delKeys, key)
5✔
454
                        e = e.Next()
5✔
455
                }
456

457
                // If more deletions are needed, simply do not add from the
458
                // list of new keys.
459
                for e := l.Front(); len(delKeys) < toDelete && e != nil; {
4✔
460
                        toDelete--
×
461
                        pr, ok := e.Value.(*paymentResult)
×
462
                        if !ok {
×
463
                                return fmt.Errorf("wrong type %T (not "+
×
464
                                        "*paymentResult )", e.Value)
×
465
                        }
×
466
                        key := string(getResultKey(pr))
×
467
                        delete(newKeys, key)
×
468
                        l.Remove(e)
×
469
                        e = l.Front()
×
470
                }
471
        }
472

473
        err := b.db.update(func(bucket kvdb.RwBucket) error {
18✔
474
                for e := l.Front(); e != nil; e = e.Next() {
22✔
475
                        pr, ok := e.Value.(*paymentResult)
13✔
476
                        if !ok {
13✔
477
                                return fmt.Errorf("wrong type %T (not "+
×
478
                                        "*paymentResult)", e.Value)
×
479
                        }
×
480

481
                        // Serialize result into key and value byte slices.
482
                        k, v, err := serializeResult(pr)
13✔
483
                        if err != nil {
13✔
484
                                return err
×
485
                        }
×
486

487
                        // Put into results bucket.
488
                        if err := bucket.Put(k, v); err != nil {
13✔
489
                                return err
×
490
                        }
×
491

492
                        storeCount++
13✔
493
                }
494

495
                // Prune oldest entries.
496
                for _, key := range delKeys {
14✔
497
                        if err := bucket.Delete([]byte(key)); err != nil {
5✔
498
                                return err
×
499
                        }
×
500
                        pruneCount++
5✔
501
                }
502

503
                return nil
9✔
504
        }, func() {
9✔
505
                storeCount, pruneCount = 0, 0
9✔
506
        })
9✔
507

508
        if err != nil {
9✔
509
                return err
×
510
        }
×
511

512
        log.Debugf("Stored mission control results: %d added, %d deleted",
9✔
513
                storeCount, pruneCount)
9✔
514

9✔
515
        // DB Update was successful, update the in-memory cache.
9✔
516
        for _, key := range delKeys {
14✔
517
                delete(b.keysMap, key)
5✔
518
                b.keys.Remove(b.keys.Front())
5✔
519
        }
5✔
520
        for e := l.Front(); e != nil; e = e.Next() {
22✔
521
                pr, ok := e.Value.(*paymentResult)
13✔
522
                if !ok {
13✔
523
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
524
                                e.Value)
×
525
                }
×
526
                key := string(getResultKey(pr))
13✔
527
                b.keys.PushBack(key)
13✔
528
        }
529

530
        return nil
9✔
531
}
532

533
// getResultKey returns a byte slice representing a unique key for this payment
534
// result.
535
func getResultKey(rp *paymentResult) []byte {
39✔
536
        var keyBytes [8 + 8 + 33]byte
39✔
537

39✔
538
        // Identify records by a combination of time, payment id and sender pub
39✔
539
        // key. This allows importing mission control data from an external
39✔
540
        // source without key collisions and keeps the records sorted
39✔
541
        // chronologically.
39✔
542
        byteOrder.PutUint64(keyBytes[:], rp.timeReply.Val)
39✔
543
        byteOrder.PutUint64(keyBytes[8:], rp.id)
39✔
544
        copy(keyBytes[16:], rp.route.Val.sourcePubKey.Val[:])
39✔
545

39✔
546
        return keyBytes[:]
39✔
547
}
39✔
548

549
// failureMessage wraps the lnwire.FailureMessage interface such that we can
550
// apply a Record method and use the failureMessage in a TLV encoded type.
551
type failureMessage struct {
552
        lnwire.FailureMessage
553
}
554

555
// Record returns a TLV record that can be used to encode/decode a list of
556
// failureMessage to/from a TLV stream.
557
func (r *failureMessage) Record() tlv.Record {
65✔
558
        recordSize := func() uint64 {
109✔
559
                var (
44✔
560
                        b   bytes.Buffer
44✔
561
                        buf [8]byte
44✔
562
                )
44✔
563
                if err := encodeFailureMessage(&b, r, &buf); err != nil {
44✔
564
                        panic(err)
×
565
                }
566

567
                return uint64(len(b.Bytes()))
44✔
568
        }
569

570
        return tlv.MakeDynamicRecord(
65✔
571
                0, r, recordSize, encodeFailureMessage, decodeFailureMessage,
65✔
572
        )
65✔
573
}
574

575
func encodeFailureMessage(w io.Writer, val interface{}, _ *[8]byte) error {
88✔
576
        if v, ok := val.(*failureMessage); ok {
176✔
577
                var b bytes.Buffer
88✔
578
                err := lnwire.EncodeFailureMessage(&b, v.FailureMessage, 0)
88✔
579
                if err != nil {
88✔
580
                        return err
×
581
                }
×
582

583
                _, err = w.Write(b.Bytes())
88✔
584

88✔
585
                return err
88✔
586
        }
587

588
        return tlv.NewTypeForEncodingErr(val, "routing.failureMessage")
×
589
}
590

591
func decodeFailureMessage(r io.Reader, val interface{}, _ *[8]byte,
592
        l uint64) error {
21✔
593

21✔
594
        if v, ok := val.(*failureMessage); ok {
42✔
595
                msg, err := lnwire.DecodeFailureMessage(r, 0)
21✔
596
                if err != nil {
21✔
597
                        return err
×
598
                }
×
599

600
                *v = failureMessage{
21✔
601
                        FailureMessage: msg,
21✔
602
                }
21✔
603

21✔
604
                return nil
21✔
605
        }
606

607
        return tlv.NewTypeForDecodingErr(val, "routing.failureMessage", l, l)
×
608
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc