• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12583319996

02 Jan 2025 01:38PM UTC coverage: 57.522% (-1.1%) from 58.598%
12583319996

Pull #9361

github

starius
fn/ContextGuard: use context.AfterFunc to wait

Simplifies context cancellation handling by using context.AfterFunc instead of a
goroutine to wait for context cancellation. This approach avoids the overhead of
a goroutine during the waiting period.

For ctxQuitUnsafe, since g.quit is closed only in the Quit method (which also
cancels all associated contexts), waiting on context cancellation ensures the
same behavior without unnecessary dependency on g.quit.

Added a test to ensure that the Create method does not launch any goroutines.
Pull Request #9361: fn: optimize context guard

102587 of 178344 relevant lines covered (57.52%)

24734.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.17
/routing/missioncontrol.go
1
package routing
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/channeldb"
15
        "github.com/lightningnetwork/lnd/clock"
16
        "github.com/lightningnetwork/lnd/fn/v2"
17
        "github.com/lightningnetwork/lnd/kvdb"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/routing/route"
20
        "github.com/lightningnetwork/lnd/tlv"
21
)
22

23
const (
24
        // DefaultPenaltyHalfLife is the default half-life duration. The
25
        // half-life duration defines after how much time a penalized node or
26
        // channel is back at 50% probability.
27
        DefaultPenaltyHalfLife = time.Hour
28

29
        // minSecondChanceInterval is the minimum time required between
30
        // second-chance failures.
31
        //
32
        // If nodes return a channel policy related failure, they may get a
33
        // second chance to forward the payment. It could be that the channel
34
        // policy that we are aware of is not up to date. This is especially
35
        // important in case of mobile apps that are mostly offline.
36
        //
37
        // However, we don't want to give nodes the option to endlessly return
38
        // new channel updates so that we are kept busy trying to route through
39
        // that node until the payment loop times out.
40
        //
41
        // Therefore we only grant a second chance to a node if the previous
42
        // second chance is sufficiently long ago. This is what
43
        // minSecondChanceInterval defines. If a second policy failure comes in
44
        // within that interval, we will apply a penalty.
45
        //
46
        // Second chances granted are tracked on the level of node pairs. This
47
        // means that if a node has multiple channels to the same peer, they
48
        // will only get a single second chance to route to that peer again.
49
        // Nodes forward non-strict, so it isn't necessary to apply a less
50
        // restrictive channel level tracking scheme here.
51
        minSecondChanceInterval = time.Minute
52

53
        // DefaultMaxMcHistory is the default maximum history size.
54
        DefaultMaxMcHistory = 1000
55

56
        // DefaultMcFlushInterval is the default interval we use to flush MC state
57
        // to the database.
58
        DefaultMcFlushInterval = time.Second
59

60
        // prevSuccessProbability is the assumed probability for node pairs that
61
        // successfully relayed the previous attempt.
62
        prevSuccessProbability = 0.95
63

64
        // DefaultAprioriWeight is the default a priori weight. See
65
        // MissionControlConfig for further explanation.
66
        DefaultAprioriWeight = 0.5
67

68
        // DefaultMinFailureRelaxInterval is the default minimum time that must
69
        // have passed since the previously recorded failure before the failure
70
        // amount may be raised.
71
        DefaultMinFailureRelaxInterval = time.Minute
72

73
        // DefaultFeeEstimationTimeout is the default value for
74
        // FeeEstimationTimeout. It defines the maximum duration that the
75
        // probing fee estimation is allowed to take.
76
        DefaultFeeEstimationTimeout = time.Minute
77

78
        // DefaultMissionControlNamespace is the name of the default mission
79
        // control name space. This is used as the sub-bucket key within the
80
        // top level DB bucket to store mission control results.
81
        DefaultMissionControlNamespace = "default"
82
)
83

84
var (
85
        // ErrInvalidMcHistory is returned if we get a negative mission control
86
        // history count.
87
        ErrInvalidMcHistory = errors.New("mission control history must be " +
88
                ">= 0")
89

90
        // ErrInvalidFailureInterval is returned if we get an invalid failure
91
        // interval.
92
        ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
93
)
94

95
// NodeResults contains previous results from a node to its peers.
96
type NodeResults map[route.Vertex]TimedPairResult
97

98
// mcConfig holds various config members that will be required by all
99
// MissionControl instances and will be the same regardless of namespace.
100
type mcConfig struct {
101
        // clock is a time source used by mission control.
102
        clock clock.Clock
103

104
        // selfNode is our pubkey.
105
        selfNode route.Vertex
106
}
107

108
// MissionControl contains state which summarizes the past attempts of HTLC
109
// routing by external callers when sending payments throughout the network. It
110
// acts as a shared memory during routing attempts with the goal to optimize the
111
// payment attempt success rate.
112
//
113
// Failed payment attempts are reported to mission control. These reports are
114
// used to track the time of the last node or channel level failure. The time
115
// since the last failure is used to estimate a success probability that is fed
116
// into the path finding process for subsequent payment attempts.
117
type MissionControl struct {
118
        cfg *mcConfig
119

120
        // state is the internal mission control state that is input for
121
        // probability estimation.
122
        state *missionControlState
123

124
        store *missionControlStore
125

126
        // estimator is the probability estimator that is used with the payment
127
        // results that mission control collects.
128
        estimator Estimator
129

130
        // onConfigUpdate is a function that is called whenever the
131
        // mission control state is updated.
132
        onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
133

134
        log btclog.Logger
135

136
        mu sync.Mutex
137
}
138

139
// MissionController manages MissionControl instances in various namespaces.
140
type MissionController struct {
141
        db           kvdb.Backend
142
        cfg          *mcConfig
143
        defaultMCCfg *MissionControlConfig
144

145
        mc map[string]*MissionControl
146
        mu sync.Mutex
147

148
        // TODO(roasbeef): further counters, if vertex continually unavailable,
149
        // add to another generation
150

151
        // TODO(roasbeef): also add favorable metrics for nodes
152
}
153

154
// GetNamespacedStore returns the MissionControl in the given namespace. If one
155
// does not yet exist, then it is initialised.
156
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
157
        error) {
34✔
158

34✔
159
        m.mu.Lock()
34✔
160
        defer m.mu.Unlock()
34✔
161

34✔
162
        if mc, ok := m.mc[ns]; ok {
67✔
163
                return mc, nil
33✔
164
        }
33✔
165

166
        return m.initMissionControl(ns)
1✔
167
}
168

169
// ListNamespaces returns a list of the namespaces that the MissionController
170
// is aware of.
171
func (m *MissionController) ListNamespaces() []string {
3✔
172
        m.mu.Lock()
3✔
173
        defer m.mu.Unlock()
3✔
174

3✔
175
        namespaces := make([]string, 0, len(m.mc))
3✔
176
        for ns := range m.mc {
8✔
177
                namespaces = append(namespaces, ns)
5✔
178
        }
5✔
179

180
        return namespaces
3✔
181
}
182

183
// MissionControlConfig defines parameters that control mission control
184
// behaviour.
185
type MissionControlConfig struct {
186
        // Estimator gives probability estimates for node pairs.
187
        Estimator Estimator
188

189
        // OnConfigUpdate is function that is called whenever the
190
        // mission control state is updated.
191
        OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
192

193
        // MaxMcHistory defines the maximum number of payment results that are
194
        // held on disk.
195
        MaxMcHistory int
196

197
        // McFlushInterval defines the ticker interval when we flush the
198
        // accumulated state to the DB.
199
        McFlushInterval time.Duration
200

201
        // MinFailureRelaxInterval is the minimum time that must have passed
202
        // since the previously recorded failure before the failure amount may
203
        // be raised.
204
        MinFailureRelaxInterval time.Duration
205
}
206

207
func (c *MissionControlConfig) validate() error {
31✔
208
        if c.MaxMcHistory < 0 {
31✔
209
                return ErrInvalidMcHistory
×
210
        }
×
211

212
        if c.MinFailureRelaxInterval < 0 {
31✔
213
                return ErrInvalidFailureInterval
×
214
        }
×
215

216
        return nil
31✔
217
}
218

219
// String returns a string representation of a mission control config.
220
func (c *MissionControlConfig) String() string {
×
221
        return fmt.Sprintf("maximum history: %v, minimum failure relax "+
×
222
                "interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
×
223
}
×
224

225
// TimedPairResult describes a timestamped pair result.
226
type TimedPairResult struct {
227
        // FailTime is the time of the last failure.
228
        FailTime time.Time
229

230
        // FailAmt is the amount of the last failure. This amount may be pushed
231
        // up if a later success is higher than the last failed amount.
232
        FailAmt lnwire.MilliSatoshi
233

234
        // SuccessTime is the time of the last success.
235
        SuccessTime time.Time
236

237
        // SuccessAmt is the highest amount that successfully forwarded. This
238
        // isn't necessarily the last success amount. The value of this field
239
        // may also be pushed down if a later failure is lower than the highest
240
        // success amount. Because of this, SuccessAmt may not match
241
        // SuccessTime.
242
        SuccessAmt lnwire.MilliSatoshi
243
}
244

245
// MissionControlSnapshot contains a snapshot of the current state of mission
246
// control.
247
type MissionControlSnapshot struct {
248
        // Pairs is a list of channels for which specific information is
249
        // logged.
250
        Pairs []MissionControlPairSnapshot
251
}
252

253
// MissionControlPairSnapshot contains a snapshot of the current node pair
254
// state in mission control.
255
type MissionControlPairSnapshot struct {
256
        // Pair is the node pair of which the state is described.
257
        Pair DirectedNodePair
258

259
        // TimedPairResult contains the data for this pair.
260
        TimedPairResult
261
}
262

263
// paymentResult is the information that becomes available when a payment
264
// attempt completes.
265
type paymentResult struct {
266
        id        uint64
267
        timeFwd   tlv.RecordT[tlv.TlvType0, uint64]
268
        timeReply tlv.RecordT[tlv.TlvType1, uint64]
269
        route     tlv.RecordT[tlv.TlvType2, mcRoute]
270

271
        // failure holds information related to the failure of a payment. The
272
        // presence of this record indicates a payment failure. The absence of
273
        // this record indicates a successful payment.
274
        failure tlv.OptionalRecordT[tlv.TlvType3, paymentFailure]
275
}
276

277
// newPaymentResult constructs a new paymentResult.
278
func newPaymentResult(id uint64, rt *mcRoute, timeFwd, timeReply time.Time,
279
        failure *paymentFailure) *paymentResult {
76✔
280

76✔
281
        result := &paymentResult{
76✔
282
                id: id,
76✔
283
                timeFwd: tlv.NewPrimitiveRecord[tlv.TlvType0](
76✔
284
                        uint64(timeFwd.UnixNano()),
76✔
285
                ),
76✔
286
                timeReply: tlv.NewPrimitiveRecord[tlv.TlvType1](
76✔
287
                        uint64(timeReply.UnixNano()),
76✔
288
                ),
76✔
289
                route: tlv.NewRecordT[tlv.TlvType2](*rt),
76✔
290
        }
76✔
291

76✔
292
        if failure != nil {
129✔
293
                result.failure = tlv.SomeRecordT(
53✔
294
                        tlv.NewRecordT[tlv.TlvType3](*failure),
53✔
295
                )
53✔
296
        }
53✔
297

298
        return result
76✔
299
}
300

301
// NewMissionController returns a new instance of MissionController.
302
func NewMissionController(db kvdb.Backend, self route.Vertex,
303
        cfg *MissionControlConfig) (*MissionController, error) {
31✔
304

31✔
305
        log.Debugf("Instantiating mission control with config: %v, %v", cfg,
31✔
306
                cfg.Estimator)
31✔
307

31✔
308
        if err := cfg.validate(); err != nil {
31✔
309
                return nil, err
×
310
        }
×
311

312
        mcCfg := &mcConfig{
31✔
313
                clock:    clock.NewDefaultClock(),
31✔
314
                selfNode: self,
31✔
315
        }
31✔
316

31✔
317
        mgr := &MissionController{
31✔
318
                db:           db,
31✔
319
                defaultMCCfg: cfg,
31✔
320
                cfg:          mcCfg,
31✔
321
                mc:           make(map[string]*MissionControl),
31✔
322
        }
31✔
323

31✔
324
        if err := mgr.loadMissionControls(); err != nil {
31✔
325
                return nil, err
×
326
        }
×
327

328
        for _, mc := range mgr.mc {
63✔
329
                if err := mc.init(); err != nil {
32✔
330
                        return nil, err
×
331
                }
×
332
        }
333

334
        return mgr, nil
31✔
335
}
336

337
// loadMissionControls initialises a MissionControl in the default namespace if
338
// one does not yet exist. It then initialises a MissionControl for all other
339
// namespaces found in the DB.
340
//
341
// NOTE: this should only be called once during MissionController construction.
342
func (m *MissionController) loadMissionControls() error {
31✔
343
        m.mu.Lock()
31✔
344
        defer m.mu.Unlock()
31✔
345

31✔
346
        // Always initialise the default namespace.
31✔
347
        _, err := m.initMissionControl(DefaultMissionControlNamespace)
31✔
348
        if err != nil {
31✔
349
                return err
×
350
        }
×
351

352
        namespaces := make(map[string]struct{})
31✔
353
        err = m.db.View(func(tx walletdb.ReadTx) error {
62✔
354
                mcStoreBkt := tx.ReadBucket(resultsKey)
31✔
355
                if mcStoreBkt == nil {
31✔
356
                        return fmt.Errorf("top level mission control bucket " +
×
357
                                "not found")
×
358
                }
×
359

360
                // Iterate through all the keys in the bucket and collect the
361
                // namespaces.
362
                return mcStoreBkt.ForEach(func(k, _ []byte) error {
63✔
363
                        // We've already initialised the default namespace so
32✔
364
                        // we can skip it.
32✔
365
                        if string(k) == DefaultMissionControlNamespace {
63✔
366
                                return nil
31✔
367
                        }
31✔
368

369
                        namespaces[string(k)] = struct{}{}
1✔
370

1✔
371
                        return nil
1✔
372
                })
373
        }, func() {})
31✔
374
        if err != nil {
31✔
375
                return err
×
376
        }
×
377

378
        // Now, iterate through all the namespaces and initialise them.
379
        for ns := range namespaces {
32✔
380
                _, err = m.initMissionControl(ns)
1✔
381
                if err != nil {
1✔
382
                        return err
×
383
                }
×
384
        }
385

386
        return nil
31✔
387
}
388

389
// initMissionControl creates a new MissionControl instance with the given
390
// namespace if one does not yet exist.
391
//
392
// NOTE: the MissionController's mutex must be held before calling this method.
393
func (m *MissionController) initMissionControl(namespace string) (
394
        *MissionControl, error) {
33✔
395

33✔
396
        // If a mission control with this namespace has already been initialised
33✔
397
        // then there is nothing left to do.
33✔
398
        if mc, ok := m.mc[namespace]; ok {
33✔
399
                return mc, nil
×
400
        }
×
401

402
        cfg := m.defaultMCCfg
33✔
403

33✔
404
        store, err := newMissionControlStore(
33✔
405
                newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
33✔
406
                cfg.McFlushInterval,
33✔
407
        )
33✔
408
        if err != nil {
33✔
409
                return nil, err
×
410
        }
×
411

412
        mc := &MissionControl{
33✔
413
                cfg: m.cfg,
33✔
414
                state: newMissionControlState(
33✔
415
                        cfg.MinFailureRelaxInterval,
33✔
416
                ),
33✔
417
                store:          store,
33✔
418
                estimator:      cfg.Estimator,
33✔
419
                log:            log.WithPrefix(fmt.Sprintf("[%s]:", namespace)),
33✔
420
                onConfigUpdate: cfg.OnConfigUpdate,
33✔
421
        }
33✔
422

33✔
423
        m.mc[namespace] = mc
33✔
424

33✔
425
        return mc, nil
33✔
426
}
427

428
// RunStoreTickers runs the mission controller store's tickers.
429
func (m *MissionController) RunStoreTickers() {
×
430
        m.mu.Lock()
×
431
        defer m.mu.Unlock()
×
432

×
433
        for _, mc := range m.mc {
×
434
                mc.store.run()
×
435
        }
×
436
}
437

438
// StopStoreTickers stops the mission control store's tickers.
439
func (m *MissionController) StopStoreTickers() {
×
440
        log.Debug("Stopping mission control store ticker")
×
441
        defer log.Debug("Mission control store ticker stopped")
×
442

×
443
        m.mu.Lock()
×
444
        defer m.mu.Unlock()
×
445

×
446
        for _, mc := range m.mc {
×
447
                mc.store.stop()
×
448
        }
×
449
}
450

451
// init initializes mission control with historical data.
452
func (m *MissionControl) init() error {
32✔
453
        m.log.Debugf("Mission control state reconstruction started")
32✔
454

32✔
455
        m.mu.Lock()
32✔
456
        defer m.mu.Unlock()
32✔
457

32✔
458
        start := time.Now()
32✔
459

32✔
460
        results, err := m.store.fetchAll()
32✔
461
        if err != nil {
32✔
462
                return err
×
463
        }
×
464

465
        for _, result := range results {
36✔
466
                _ = m.applyPaymentResult(result)
4✔
467
        }
4✔
468

469
        m.log.Debugf("Mission control state reconstruction finished: "+
32✔
470
                "n=%v, time=%v", len(results), time.Since(start))
32✔
471

32✔
472
        return nil
32✔
473
}
474

475
// GetConfig returns the config that mission control is currently configured
476
// with. All fields are copied by value, so we do not need to worry about
477
// mutation.
478
func (m *MissionControl) GetConfig() *MissionControlConfig {
×
479
        m.mu.Lock()
×
480
        defer m.mu.Unlock()
×
481

×
482
        return &MissionControlConfig{
×
483
                Estimator:               m.estimator,
×
484
                MaxMcHistory:            m.store.maxRecords,
×
485
                McFlushInterval:         m.store.flushInterval,
×
486
                MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
×
487
        }
×
488
}
×
489

490
// SetConfig validates the config provided and updates mission control's config
491
// if it is valid.
492
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
×
493
        if cfg == nil {
×
494
                return errors.New("nil mission control config")
×
495
        }
×
496

497
        if err := cfg.validate(); err != nil {
×
498
                return err
×
499
        }
×
500

501
        m.mu.Lock()
×
502
        defer m.mu.Unlock()
×
503

×
504
        m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
×
505
                cfg.Estimator)
×
506

×
507
        m.store.maxRecords = cfg.MaxMcHistory
×
508
        m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
×
509
        m.estimator = cfg.Estimator
×
510

×
511
        // Execute the callback function if it is set.
×
512
        m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
×
513
                f(cfg)
×
514
        })
×
515

516
        return nil
×
517
}
518

519
// ResetHistory resets the history of MissionControl returning it to a state as
520
// if no payment attempts have been made.
521
func (m *MissionControl) ResetHistory() error {
3✔
522
        m.mu.Lock()
3✔
523
        defer m.mu.Unlock()
3✔
524

3✔
525
        if err := m.store.clear(); err != nil {
3✔
526
                return err
×
527
        }
×
528

529
        m.state.resetHistory()
3✔
530

3✔
531
        m.log.Debugf("Mission control history cleared")
3✔
532

3✔
533
        return nil
3✔
534
}
535

536
// GetProbability is expected to return the success probability of a payment
537
// from fromNode along edge.
538
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
539
        amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
662✔
540

662✔
541
        m.mu.Lock()
662✔
542
        defer m.mu.Unlock()
662✔
543

662✔
544
        now := m.cfg.clock.Now()
662✔
545
        results, _ := m.state.getLastPairResult(fromNode)
662✔
546

662✔
547
        // Use a distinct probability estimation function for local channels.
662✔
548
        if fromNode == m.cfg.selfNode {
701✔
549
                return m.estimator.LocalPairProbability(now, results, toNode)
39✔
550
        }
39✔
551

552
        return m.estimator.PairProbability(
623✔
553
                now, results, toNode, amt, capacity,
623✔
554
        )
623✔
555
}
556

557
// GetHistorySnapshot takes a snapshot from the current mission control state
558
// and actual probability estimates.
559
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
1✔
560
        m.mu.Lock()
1✔
561
        defer m.mu.Unlock()
1✔
562

1✔
563
        m.log.Debugf("Requesting history snapshot from mission control")
1✔
564

1✔
565
        return m.state.getSnapshot()
1✔
566
}
1✔
567

568
// ImportHistory imports the set of mission control results provided to our
569
// in-memory state. These results are not persisted, so will not survive
570
// restarts.
571
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
572
        force bool) error {
×
573

×
574
        if history == nil {
×
575
                return errors.New("cannot import nil history")
×
576
        }
×
577

578
        m.mu.Lock()
×
579
        defer m.mu.Unlock()
×
580

×
581
        m.log.Infof("Importing history snapshot with %v pairs to mission "+
×
582
                "control", len(history.Pairs))
×
583

×
584
        imported := m.state.importSnapshot(history, force)
×
585

×
586
        m.log.Infof("Imported %v results to mission control", imported)
×
587

×
588
        return nil
×
589
}
590

591
// GetPairHistorySnapshot returns the stored history for a given node pair.
592
func (m *MissionControl) GetPairHistorySnapshot(
593
        fromNode, toNode route.Vertex) TimedPairResult {
×
594

×
595
        m.mu.Lock()
×
596
        defer m.mu.Unlock()
×
597

×
598
        results, ok := m.state.getLastPairResult(fromNode)
×
599
        if !ok {
×
600
                return TimedPairResult{}
×
601
        }
×
602

603
        result, ok := results[toNode]
×
604
        if !ok {
×
605
                return TimedPairResult{}
×
606
        }
×
607

608
        return result
×
609
}
610

611
// ReportPaymentFail reports a failed payment to mission control as input for
612
// future probability estimates. The failureSourceIdx argument indicates the
613
// failure source. If it is nil, the failure source is unknown. This function
614
// returns a reason if this failure is a final failure. In that case no further
615
// payment attempts need to be made.
616
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
617
        failureSourceIdx *int, failure lnwire.FailureMessage) (
618
        *channeldb.FailureReason, error) {
46✔
619

46✔
620
        timestamp := m.cfg.clock.Now()
46✔
621

46✔
622
        result := newPaymentResult(
46✔
623
                paymentID, extractMCRoute(rt), timestamp, timestamp,
46✔
624
                newPaymentFailure(failureSourceIdx, failure),
46✔
625
        )
46✔
626

46✔
627
        return m.processPaymentResult(result)
46✔
628
}
46✔
629

630
// ReportPaymentSuccess reports a successful payment to mission control as input
631
// for future probability estimates.
632
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
633
        rt *route.Route) error {
22✔
634

22✔
635
        timestamp := m.cfg.clock.Now()
22✔
636

22✔
637
        result := newPaymentResult(
22✔
638
                paymentID, extractMCRoute(rt), timestamp, timestamp, nil,
22✔
639
        )
22✔
640

22✔
641
        _, err := m.processPaymentResult(result)
22✔
642

22✔
643
        return err
22✔
644
}
22✔
645

646
// processPaymentResult stores a payment result in the mission control store and
647
// updates mission control's in-memory state.
648
func (m *MissionControl) processPaymentResult(result *paymentResult) (
649
        *channeldb.FailureReason, error) {
68✔
650

68✔
651
        // Store complete result in database.
68✔
652
        m.store.AddResult(result)
68✔
653

68✔
654
        m.mu.Lock()
68✔
655
        defer m.mu.Unlock()
68✔
656

68✔
657
        // Apply result to update mission control state.
68✔
658
        reason := m.applyPaymentResult(result)
68✔
659

68✔
660
        return reason, nil
68✔
661
}
68✔
662

663
// applyPaymentResult applies a payment result as input for future probability
664
// estimates. It returns a bool indicating whether this error is a final error
665
// and no further payment attempts need to be made.
666
func (m *MissionControl) applyPaymentResult(
667
        result *paymentResult) *channeldb.FailureReason {
72✔
668

72✔
669
        // Interpret result.
72✔
670
        i := interpretResult(&result.route.Val, result.failure.ValOpt())
72✔
671

72✔
672
        if i.policyFailure != nil {
81✔
673
                if m.state.requestSecondChance(
9✔
674
                        time.Unix(0, int64(result.timeReply.Val)),
9✔
675
                        i.policyFailure.From, i.policyFailure.To,
9✔
676
                ) {
15✔
677
                        return nil
6✔
678
                }
6✔
679
        }
680

681
        // If there is a node-level failure, record a failure for every tried
682
        // connection of that node. A node-level failure can be considered as a
683
        // failure that would have occurred with any of the node's channels.
684
        //
685
        // Ideally we'd also record the failure for the untried connections of
686
        // the node. Unfortunately this would require access to the graph and
687
        // adding this dependency and db calls does not outweigh the benefits.
688
        //
689
        // Untried connections will fall back to the node probability. After the
690
        // call to setAllPairResult below, the node probability will be equal to
691
        // the probability of the tried channels except that the a priori
692
        // probability is mixed in too. This effect is controlled by the
693
        // aprioriWeight parameter. If that parameter isn't set to an extreme
694
        // and there are a few known connections, there shouldn't be much of a
695
        // difference. The largest difference occurs when aprioriWeight is 1. In
696
        // that case, a node-level failure would not be applied to untried
697
        // channels.
698
        if i.nodeFailure != nil {
72✔
699
                m.log.Debugf("Reporting node failure to Mission Control: "+
6✔
700
                        "node=%v", *i.nodeFailure)
6✔
701

6✔
702
                m.state.setAllFail(
6✔
703
                        *i.nodeFailure,
6✔
704
                        time.Unix(0, int64(result.timeReply.Val)),
6✔
705
                )
6✔
706
        }
6✔
707

708
        for pair, pairResult := range i.pairResults {
212✔
709
                pairResult := pairResult
146✔
710

146✔
711
                if pairResult.success {
227✔
712
                        m.log.Debugf("Reporting pair success to Mission "+
81✔
713
                                "Control: pair=%v, amt=%v",
81✔
714
                                pair, pairResult.amt)
81✔
715
                } else {
146✔
716
                        m.log.Debugf("Reporting pair failure to Mission "+
65✔
717
                                "Control: pair=%v, amt=%v",
65✔
718
                                pair, pairResult.amt)
65✔
719
                }
65✔
720

721
                m.state.setLastPairResult(
146✔
722
                        pair.From, pair.To,
146✔
723
                        time.Unix(0, int64(result.timeReply.Val)), &pairResult,
146✔
724
                        false,
146✔
725
                )
146✔
726
        }
727

728
        return i.finalFailureReason
66✔
729
}
730

731
// namespacedDB is an implementation of the missionControlDB that gives a user
732
// of the interface access to a namespaced bucket within the top level mission
733
// control bucket.
734
type namespacedDB struct {
735
        topLevelBucketKey []byte
736
        namespace         []byte
737
        db                kvdb.Backend
738
}
739

740
// A compile-time check to ensure that namespacedDB implements missionControlDB.
741
var _ missionControlDB = (*namespacedDB)(nil)
742

743
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
744
// default namespace.
745
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
4✔
746
        return newNamespacedDB(db, DefaultMissionControlNamespace)
4✔
747
}
4✔
748

749
// newNamespacedDB creates a new instance of missionControlDB where the DB will
750
// have access to a namespaced bucket within the top level mission control
751
// bucket.
752
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
37✔
753
        return &namespacedDB{
37✔
754
                db:                db,
37✔
755
                namespace:         []byte(namespace),
37✔
756
                topLevelBucketKey: resultsKey,
37✔
757
        }
37✔
758
}
37✔
759

760
// update can be used to perform reads and writes on the given bucket.
761
//
762
// NOTE: this is part of the missionControlDB interface.
763
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
764
        reset func()) error {
46✔
765

46✔
766
        return n.db.Update(func(tx kvdb.RwTx) error {
92✔
767
                mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
46✔
768
                if err != nil {
46✔
769
                        return fmt.Errorf("cannot create top level mission "+
×
770
                                "control bucket: %w", err)
×
771
                }
×
772

773
                namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
46✔
774
                        n.namespace,
46✔
775
                )
46✔
776
                if err != nil {
46✔
777
                        return fmt.Errorf("cannot create namespaced bucket "+
×
778
                                "(%s) in mission control store: %w",
×
779
                                n.namespace, err)
×
780
                }
×
781

782
                return f(namespacedBkt)
46✔
783
        }, reset)
784
}
785

786
// view can be used to perform reads on the given bucket.
787
//
788
// NOTE: this is part of the missionControlDB interface.
789
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
790
        reset func()) error {
47✔
791

47✔
792
        return n.db.View(func(tx kvdb.RTx) error {
94✔
793
                mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
47✔
794
                if mcStoreBkt == nil {
47✔
795
                        return fmt.Errorf("top level mission control bucket " +
×
796
                                "not found")
×
797
                }
×
798

799
                namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
47✔
800
                if namespacedBkt == nil {
47✔
801
                        return fmt.Errorf("namespaced bucket (%s) not found "+
×
802
                                "in mission control store", n.namespace)
×
803
                }
×
804

805
                return f(namespacedBkt)
47✔
806
        }, reset)
807
}
808

809
// purge will delete all the contents in the namespace.
810
//
811
// NOTE: this is part of the missionControlDB interface.
812
func (n *namespacedDB) purge() error {
3✔
813
        return n.db.Update(func(tx kvdb.RwTx) error {
6✔
814
                mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
3✔
815
                if mcStoreBkt == nil {
3✔
816
                        return nil
×
817
                }
×
818

819
                err := mcStoreBkt.DeleteNestedBucket(n.namespace)
3✔
820
                if err != nil {
3✔
821
                        return err
×
822
                }
×
823

824
                _, err = mcStoreBkt.CreateBucket(n.namespace)
3✔
825

3✔
826
                return err
3✔
827
        }, func() {})
3✔
828
}
829

830
// paymentFailure represents the presence of a payment failure. It may or may
831
// not include additional information about said failure.
832
type paymentFailure struct {
833
        info tlv.OptionalRecordT[tlv.TlvType0, paymentFailureInfo]
834
}
835

836
// newPaymentFailure constructs a new paymentFailure struct. If the source
837
// index is nil, then an empty paymentFailure is returned. This represents a
838
// failure with unknown details. Otherwise, the index and failure message are
839
// used to populate the info field of the paymentFailure.
840
func newPaymentFailure(sourceIdx *int,
841
        failureMsg lnwire.FailureMessage) *paymentFailure {
77✔
842

77✔
843
        if sourceIdx == nil {
78✔
844
                return &paymentFailure{}
1✔
845
        }
1✔
846

847
        info := paymentFailureInfo{
76✔
848
                sourceIdx: tlv.NewPrimitiveRecord[tlv.TlvType0](
76✔
849
                        uint8(*sourceIdx),
76✔
850
                ),
76✔
851
                msg: tlv.NewRecordT[tlv.TlvType1](failureMessage{failureMsg}),
76✔
852
        }
76✔
853

76✔
854
        return &paymentFailure{
76✔
855
                info: tlv.SomeRecordT(tlv.NewRecordT[tlv.TlvType0](info)),
76✔
856
        }
76✔
857
}
858

859
// Record returns a TLV record that can be used to encode/decode a
860
// paymentFailure to/from a TLV stream.
861
func (r *paymentFailure) Record() tlv.Record {
34✔
862
        recordSize := func() uint64 {
45✔
863
                var (
11✔
864
                        b   bytes.Buffer
11✔
865
                        buf [8]byte
11✔
866
                )
11✔
867
                if err := encodePaymentFailure(&b, r, &buf); err != nil {
11✔
868
                        panic(err)
×
869
                }
870

871
                return uint64(len(b.Bytes()))
11✔
872
        }
873

874
        return tlv.MakeDynamicRecord(
34✔
875
                0, r, recordSize, encodePaymentFailure, decodePaymentFailure,
34✔
876
        )
34✔
877
}
878

879
func encodePaymentFailure(w io.Writer, val interface{}, _ *[8]byte) error {
22✔
880
        if v, ok := val.(*paymentFailure); ok {
44✔
881
                var recordProducers []tlv.RecordProducer
22✔
882
                v.info.WhenSome(
22✔
883
                        func(r tlv.RecordT[tlv.TlvType0, paymentFailureInfo]) {
44✔
884
                                recordProducers = append(recordProducers, &r)
22✔
885
                        },
22✔
886
                )
887

888
                return lnwire.EncodeRecordsTo(
22✔
889
                        w, lnwire.ProduceRecordsSorted(recordProducers...),
22✔
890
                )
22✔
891
        }
892

893
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailure")
×
894
}
895

896
func decodePaymentFailure(r io.Reader, val interface{}, _ *[8]byte,
897
        l uint64) error {
21✔
898

21✔
899
        if v, ok := val.(*paymentFailure); ok {
42✔
900
                var h paymentFailure
21✔
901

21✔
902
                info := tlv.ZeroRecordT[tlv.TlvType0, paymentFailureInfo]()
21✔
903
                typeMap, err := lnwire.DecodeRecords(
21✔
904
                        r, lnwire.ProduceRecordsSorted(&info)...,
21✔
905
                )
21✔
906
                if err != nil {
21✔
907
                        return err
×
908
                }
×
909

910
                if _, ok := typeMap[h.info.TlvType()]; ok {
42✔
911
                        h.info = tlv.SomeRecordT(info)
21✔
912
                }
21✔
913

914
                *v = h
21✔
915

21✔
916
                return nil
21✔
917
        }
918

919
        return tlv.NewTypeForDecodingErr(val, "routing.paymentFailure", l, l)
×
920
}
921

922
// paymentFailureInfo holds additional information about a payment failure.
923
type paymentFailureInfo struct {
924
        sourceIdx tlv.RecordT[tlv.TlvType0, uint8]
925
        msg       tlv.RecordT[tlv.TlvType1, failureMessage]
926
}
927

928
// Record returns a TLV record that can be used to encode/decode a
929
// paymentFailureInfo to/from a TLV stream.
930
func (r *paymentFailureInfo) Record() tlv.Record {
43✔
931
        recordSize := func() uint64 {
65✔
932
                var (
22✔
933
                        b   bytes.Buffer
22✔
934
                        buf [8]byte
22✔
935
                )
22✔
936
                if err := encodePaymentFailureInfo(&b, r, &buf); err != nil {
22✔
937
                        panic(err)
×
938
                }
939

940
                return uint64(len(b.Bytes()))
22✔
941
        }
942

943
        return tlv.MakeDynamicRecord(
43✔
944
                0, r, recordSize, encodePaymentFailureInfo,
43✔
945
                decodePaymentFailureInfo,
43✔
946
        )
43✔
947
}
948

949
func encodePaymentFailureInfo(w io.Writer, val interface{}, _ *[8]byte) error {
44✔
950
        if v, ok := val.(*paymentFailureInfo); ok {
88✔
951
                return lnwire.EncodeRecordsTo(
44✔
952
                        w, lnwire.ProduceRecordsSorted(
44✔
953
                                &v.sourceIdx, &v.msg,
44✔
954
                        ),
44✔
955
                )
44✔
956
        }
44✔
957

958
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailureInfo")
×
959
}
960

961
func decodePaymentFailureInfo(r io.Reader, val interface{}, _ *[8]byte,
962
        l uint64) error {
21✔
963

21✔
964
        if v, ok := val.(*paymentFailureInfo); ok {
42✔
965
                var h paymentFailureInfo
21✔
966

21✔
967
                _, err := lnwire.DecodeRecords(
21✔
968
                        r,
21✔
969
                        lnwire.ProduceRecordsSorted(&h.sourceIdx, &h.msg)...,
21✔
970
                )
21✔
971
                if err != nil {
21✔
972
                        return err
×
973
                }
×
974

975
                *v = h
21✔
976

21✔
977
                return nil
21✔
978
        }
979

980
        return tlv.NewTypeForDecodingErr(
×
981
                val, "routing.paymentFailureInfo", l, l,
×
982
        )
×
983
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc