• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.77
/routing/missioncontrol.go
1
package routing
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/channeldb"
15
        "github.com/lightningnetwork/lnd/clock"
16
        "github.com/lightningnetwork/lnd/fn/v2"
17
        "github.com/lightningnetwork/lnd/kvdb"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/routing/route"
20
        "github.com/lightningnetwork/lnd/tlv"
21
)
22

23
const (
24
        // DefaultPenaltyHalfLife is the default half-life duration. The
25
        // half-life duration defines after how much time a penalized node or
26
        // channel is back at 50% probability.
27
        DefaultPenaltyHalfLife = time.Hour
28

29
        // minSecondChanceInterval is the minimum time required between
30
        // second-chance failures.
31
        //
32
        // If nodes return a channel policy related failure, they may get a
33
        // second chance to forward the payment. It could be that the channel
34
        // policy that we are aware of is not up to date. This is especially
35
        // important in case of mobile apps that are mostly offline.
36
        //
37
        // However, we don't want to give nodes the option to endlessly return
38
        // new channel updates so that we are kept busy trying to route through
39
        // that node until the payment loop times out.
40
        //
41
        // Therefore we only grant a second chance to a node if the previous
42
        // second chance is sufficiently long ago. This is what
43
        // minSecondChanceInterval defines. If a second policy failure comes in
44
        // within that interval, we will apply a penalty.
45
        //
46
        // Second chances granted are tracked on the level of node pairs. This
47
        // means that if a node has multiple channels to the same peer, they
48
        // will only get a single second chance to route to that peer again.
49
        // Nodes forward non-strict, so it isn't necessary to apply a less
50
        // restrictive channel level tracking scheme here.
51
        minSecondChanceInterval = time.Minute
52

53
        // DefaultMaxMcHistory is the default maximum history size.
54
        DefaultMaxMcHistory = 1000
55

56
        // DefaultMcFlushInterval is the default interval we use to flush MC state
57
        // to the database.
58
        DefaultMcFlushInterval = time.Second
59

60
        // prevSuccessProbability is the assumed probability for node pairs that
61
        // successfully relayed the previous attempt.
62
        prevSuccessProbability = 0.95
63

64
        // DefaultAprioriWeight is the default a priori weight. See
65
        // MissionControlConfig for further explanation.
66
        DefaultAprioriWeight = 0.5
67

68
        // DefaultMinFailureRelaxInterval is the default minimum time that must
69
        // have passed since the previously recorded failure before the failure
70
        // amount may be raised.
71
        DefaultMinFailureRelaxInterval = time.Minute
72

73
        // DefaultFeeEstimationTimeout is the default value for
74
        // FeeEstimationTimeout. It defines the maximum duration that the
75
        // probing fee estimation is allowed to take.
76
        DefaultFeeEstimationTimeout = time.Minute
77

78
        // DefaultMissionControlNamespace is the name of the default mission
79
        // control name space. This is used as the sub-bucket key within the
80
        // top level DB bucket to store mission control results.
81
        DefaultMissionControlNamespace = "default"
82
)
83

84
var (
85
        // ErrInvalidMcHistory is returned if we get a negative mission control
86
        // history count.
87
        ErrInvalidMcHistory = errors.New("mission control history must be " +
88
                ">= 0")
89

90
        // ErrInvalidFailureInterval is returned if we get an invalid failure
91
        // interval.
92
        ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
93
)
94

95
// NodeResults contains previous results from a node to its peers.
96
type NodeResults map[route.Vertex]TimedPairResult
97

98
// mcConfig holds various config members that will be required by all
99
// MissionControl instances and will be the same regardless of namespace.
100
type mcConfig struct {
101
        // clock is a time source used by mission control.
102
        clock clock.Clock
103

104
        // selfNode is our pubkey.
105
        selfNode route.Vertex
106
}
107

108
// MissionControl contains state which summarizes the past attempts of HTLC
109
// routing by external callers when sending payments throughout the network. It
110
// acts as a shared memory during routing attempts with the goal to optimize the
111
// payment attempt success rate.
112
//
113
// Failed payment attempts are reported to mission control. These reports are
114
// used to track the time of the last node or channel level failure. The time
115
// since the last failure is used to estimate a success probability that is fed
116
// into the path finding process for subsequent payment attempts.
117
type MissionControl struct {
118
        cfg *mcConfig
119

120
        // state is the internal mission control state that is input for
121
        // probability estimation.
122
        state *missionControlState
123

124
        store *missionControlStore
125

126
        // estimator is the probability estimator that is used with the payment
127
        // results that mission control collects.
128
        estimator Estimator
129

130
        // onConfigUpdate is a function that is called whenever the
131
        // mission control state is updated.
132
        onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
133

134
        log btclog.Logger
135

136
        mu sync.Mutex
137
}
138

139
// MissionController manages MissionControl instances in various namespaces.
140
type MissionController struct {
141
        db           kvdb.Backend
142
        cfg          *mcConfig
143
        defaultMCCfg *MissionControlConfig
144

145
        mc map[string]*MissionControl
146
        mu sync.Mutex
147

148
        // TODO(roasbeef): further counters, if vertex continually unavailable,
149
        // add to another generation
150

151
        // TODO(roasbeef): also add favorable metrics for nodes
152
}
153

154
// GetNamespacedStore returns the MissionControl in the given namespace. If one
155
// does not yet exist, then it is initialised.
156
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
157
        error) {
3✔
158

3✔
159
        m.mu.Lock()
3✔
160
        defer m.mu.Unlock()
3✔
161

3✔
162
        if mc, ok := m.mc[ns]; ok {
6✔
163
                return mc, nil
3✔
164
        }
3✔
165

UNCOV
166
        return m.initMissionControl(ns)
×
167
}
168

169
// ListNamespaces returns a list of the namespaces that the MissionController
170
// is aware of.
UNCOV
171
func (m *MissionController) ListNamespaces() []string {
×
UNCOV
172
        m.mu.Lock()
×
UNCOV
173
        defer m.mu.Unlock()
×
UNCOV
174

×
UNCOV
175
        namespaces := make([]string, 0, len(m.mc))
×
UNCOV
176
        for ns := range m.mc {
×
UNCOV
177
                namespaces = append(namespaces, ns)
×
UNCOV
178
        }
×
179

UNCOV
180
        return namespaces
×
181
}
182

183
// MissionControlConfig defines parameters that control mission control
184
// behaviour.
185
type MissionControlConfig struct {
186
        // Estimator gives probability estimates for node pairs.
187
        Estimator Estimator
188

189
        // OnConfigUpdate is function that is called whenever the
190
        // mission control state is updated.
191
        OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
192

193
        // MaxMcHistory defines the maximum number of payment results that are
194
        // held on disk.
195
        MaxMcHistory int
196

197
        // McFlushInterval defines the ticker interval when we flush the
198
        // accumulated state to the DB.
199
        McFlushInterval time.Duration
200

201
        // MinFailureRelaxInterval is the minimum time that must have passed
202
        // since the previously recorded failure before the failure amount may
203
        // be raised.
204
        MinFailureRelaxInterval time.Duration
205
}
206

207
func (c *MissionControlConfig) validate() error {
3✔
208
        if c.MaxMcHistory < 0 {
3✔
209
                return ErrInvalidMcHistory
×
210
        }
×
211

212
        if c.MinFailureRelaxInterval < 0 {
3✔
213
                return ErrInvalidFailureInterval
×
214
        }
×
215

216
        return nil
3✔
217
}
218

219
// String returns a string representation of a mission control config.
220
func (c *MissionControlConfig) String() string {
3✔
221
        return fmt.Sprintf("maximum history: %v, minimum failure relax "+
3✔
222
                "interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
3✔
223
}
3✔
224

225
// TimedPairResult describes a timestamped pair result.
226
type TimedPairResult struct {
227
        // FailTime is the time of the last failure.
228
        FailTime time.Time
229

230
        // FailAmt is the amount of the last failure. This amount may be pushed
231
        // up if a later success is higher than the last failed amount.
232
        FailAmt lnwire.MilliSatoshi
233

234
        // SuccessTime is the time of the last success.
235
        SuccessTime time.Time
236

237
        // SuccessAmt is the highest amount that successfully forwarded. This
238
        // isn't necessarily the last success amount. The value of this field
239
        // may also be pushed down if a later failure is lower than the highest
240
        // success amount. Because of this, SuccessAmt may not match
241
        // SuccessTime.
242
        SuccessAmt lnwire.MilliSatoshi
243
}
244

245
// MissionControlSnapshot contains a snapshot of the current state of mission
246
// control.
247
type MissionControlSnapshot struct {
248
        // Pairs is a list of channels for which specific information is
249
        // logged.
250
        Pairs []MissionControlPairSnapshot
251
}
252

253
// MissionControlPairSnapshot contains a snapshot of the current node pair
254
// state in mission control.
255
type MissionControlPairSnapshot struct {
256
        // Pair is the node pair of which the state is described.
257
        Pair DirectedNodePair
258

259
        // TimedPairResult contains the data for this pair.
260
        TimedPairResult
261
}
262

263
// paymentResult is the information that becomes available when a payment
264
// attempt completes.
265
type paymentResult struct {
266
        id        uint64
267
        timeFwd   tlv.RecordT[tlv.TlvType0, uint64]
268
        timeReply tlv.RecordT[tlv.TlvType1, uint64]
269
        route     tlv.RecordT[tlv.TlvType2, mcRoute]
270

271
        // failure holds information related to the failure of a payment. The
272
        // presence of this record indicates a payment failure. The absence of
273
        // this record indicates a successful payment.
274
        failure tlv.OptionalRecordT[tlv.TlvType3, paymentFailure]
275
}
276

277
// newPaymentResult constructs a new paymentResult.
278
func newPaymentResult(id uint64, rt *mcRoute, timeFwd, timeReply time.Time,
279
        failure fn.Option[paymentFailure]) *paymentResult {
3✔
280

3✔
281
        result := &paymentResult{
3✔
282
                id: id,
3✔
283
                timeFwd: tlv.NewPrimitiveRecord[tlv.TlvType0](
3✔
284
                        uint64(timeFwd.UnixNano()),
3✔
285
                ),
3✔
286
                timeReply: tlv.NewPrimitiveRecord[tlv.TlvType1](
3✔
287
                        uint64(timeReply.UnixNano()),
3✔
288
                ),
3✔
289
                route: tlv.NewRecordT[tlv.TlvType2](*rt),
3✔
290
        }
3✔
291

3✔
292
        failure.WhenSome(
3✔
293
                func(f paymentFailure) {
6✔
294
                        result.failure = tlv.SomeRecordT(
3✔
295
                                tlv.NewRecordT[tlv.TlvType3](f),
3✔
296
                        )
3✔
297
                },
3✔
298
        )
299

300
        return result
3✔
301
}
302

303
// NewMissionController returns a new instance of MissionController.
304
func NewMissionController(db kvdb.Backend, self route.Vertex,
305
        cfg *MissionControlConfig) (*MissionController, error) {
3✔
306

3✔
307
        log.Debugf("Instantiating mission control with config: %v, %v", cfg,
3✔
308
                cfg.Estimator)
3✔
309

3✔
310
        if err := cfg.validate(); err != nil {
3✔
311
                return nil, err
×
312
        }
×
313

314
        mcCfg := &mcConfig{
3✔
315
                clock:    clock.NewDefaultClock(),
3✔
316
                selfNode: self,
3✔
317
        }
3✔
318

3✔
319
        mgr := &MissionController{
3✔
320
                db:           db,
3✔
321
                defaultMCCfg: cfg,
3✔
322
                cfg:          mcCfg,
3✔
323
                mc:           make(map[string]*MissionControl),
3✔
324
        }
3✔
325

3✔
326
        if err := mgr.loadMissionControls(); err != nil {
3✔
327
                return nil, err
×
328
        }
×
329

330
        for _, mc := range mgr.mc {
6✔
331
                if err := mc.init(); err != nil {
3✔
332
                        return nil, err
×
333
                }
×
334
        }
335

336
        return mgr, nil
3✔
337
}
338

339
// loadMissionControls initialises a MissionControl in the default namespace if
340
// one does not yet exist. It then initialises a MissionControl for all other
341
// namespaces found in the DB.
342
//
343
// NOTE: this should only be called once during MissionController construction.
344
func (m *MissionController) loadMissionControls() error {
3✔
345
        m.mu.Lock()
3✔
346
        defer m.mu.Unlock()
3✔
347

3✔
348
        // Always initialise the default namespace.
3✔
349
        _, err := m.initMissionControl(DefaultMissionControlNamespace)
3✔
350
        if err != nil {
3✔
351
                return err
×
352
        }
×
353

354
        namespaces := make(map[string]struct{})
3✔
355
        err = m.db.View(func(tx walletdb.ReadTx) error {
6✔
356
                mcStoreBkt := tx.ReadBucket(resultsKey)
3✔
357
                if mcStoreBkt == nil {
3✔
358
                        return fmt.Errorf("top level mission control bucket " +
×
359
                                "not found")
×
360
                }
×
361

362
                // Iterate through all the keys in the bucket and collect the
363
                // namespaces.
364
                return mcStoreBkt.ForEach(func(k, _ []byte) error {
6✔
365
                        // We've already initialised the default namespace so
3✔
366
                        // we can skip it.
3✔
367
                        if string(k) == DefaultMissionControlNamespace {
6✔
368
                                return nil
3✔
369
                        }
3✔
370

UNCOV
371
                        namespaces[string(k)] = struct{}{}
×
UNCOV
372

×
UNCOV
373
                        return nil
×
374
                })
375
        }, func() {})
3✔
376
        if err != nil {
3✔
377
                return err
×
378
        }
×
379

380
        // Now, iterate through all the namespaces and initialise them.
381
        for ns := range namespaces {
3✔
UNCOV
382
                _, err = m.initMissionControl(ns)
×
UNCOV
383
                if err != nil {
×
384
                        return err
×
385
                }
×
386
        }
387

388
        return nil
3✔
389
}
390

391
// initMissionControl creates a new MissionControl instance with the given
392
// namespace if one does not yet exist.
393
//
394
// NOTE: the MissionController's mutex must be held before calling this method.
395
func (m *MissionController) initMissionControl(namespace string) (
396
        *MissionControl, error) {
3✔
397

3✔
398
        // If a mission control with this namespace has already been initialised
3✔
399
        // then there is nothing left to do.
3✔
400
        if mc, ok := m.mc[namespace]; ok {
3✔
401
                return mc, nil
×
402
        }
×
403

404
        cfg := m.defaultMCCfg
3✔
405

3✔
406
        store, err := newMissionControlStore(
3✔
407
                newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
3✔
408
                cfg.McFlushInterval,
3✔
409
        )
3✔
410
        if err != nil {
3✔
411
                return nil, err
×
412
        }
×
413

414
        mc := &MissionControl{
3✔
415
                cfg: m.cfg,
3✔
416
                state: newMissionControlState(
3✔
417
                        cfg.MinFailureRelaxInterval,
3✔
418
                ),
3✔
419
                store:          store,
3✔
420
                estimator:      cfg.Estimator,
3✔
421
                log:            log.WithPrefix(fmt.Sprintf("[%s]:", namespace)),
3✔
422
                onConfigUpdate: cfg.OnConfigUpdate,
3✔
423
        }
3✔
424

3✔
425
        m.mc[namespace] = mc
3✔
426

3✔
427
        return mc, nil
3✔
428
}
429

430
// RunStoreTickers runs the mission controller store's tickers.
431
func (m *MissionController) RunStoreTickers() {
3✔
432
        m.mu.Lock()
3✔
433
        defer m.mu.Unlock()
3✔
434

3✔
435
        for _, mc := range m.mc {
6✔
436
                mc.store.run()
3✔
437
        }
3✔
438
}
439

440
// StopStoreTickers stops the mission control store's tickers.
441
func (m *MissionController) StopStoreTickers() {
3✔
442
        log.Debug("Stopping mission control store ticker")
3✔
443
        defer log.Debug("Mission control store ticker stopped")
3✔
444

3✔
445
        m.mu.Lock()
3✔
446
        defer m.mu.Unlock()
3✔
447

3✔
448
        for _, mc := range m.mc {
6✔
449
                mc.store.stop()
3✔
450
        }
3✔
451
}
452

453
// init initializes mission control with historical data.
454
func (m *MissionControl) init() error {
3✔
455
        m.log.Debugf("Mission control state reconstruction started")
3✔
456

3✔
457
        m.mu.Lock()
3✔
458
        defer m.mu.Unlock()
3✔
459

3✔
460
        start := time.Now()
3✔
461

3✔
462
        results, err := m.store.fetchAll()
3✔
463
        if err != nil {
3✔
464
                return err
×
465
        }
×
466

467
        for _, result := range results {
6✔
468
                _ = m.applyPaymentResult(result)
3✔
469
        }
3✔
470

471
        m.log.Debugf("Mission control state reconstruction finished: "+
3✔
472
                "n=%v, time=%v", len(results), time.Since(start))
3✔
473

3✔
474
        return nil
3✔
475
}
476

477
// GetConfig returns the config that mission control is currently configured
478
// with. All fields are copied by value, so we do not need to worry about
479
// mutation.
480
func (m *MissionControl) GetConfig() *MissionControlConfig {
3✔
481
        m.mu.Lock()
3✔
482
        defer m.mu.Unlock()
3✔
483

3✔
484
        return &MissionControlConfig{
3✔
485
                Estimator:               m.estimator,
3✔
486
                MaxMcHistory:            m.store.maxRecords,
3✔
487
                McFlushInterval:         m.store.flushInterval,
3✔
488
                MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
3✔
489
        }
3✔
490
}
3✔
491

492
// SetConfig validates the config provided and updates mission control's config
493
// if it is valid.
494
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
3✔
495
        if cfg == nil {
3✔
496
                return errors.New("nil mission control config")
×
497
        }
×
498

499
        if err := cfg.validate(); err != nil {
3✔
500
                return err
×
501
        }
×
502

503
        m.mu.Lock()
3✔
504
        defer m.mu.Unlock()
3✔
505

3✔
506
        m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
3✔
507
                cfg.Estimator)
3✔
508

3✔
509
        m.store.maxRecords = cfg.MaxMcHistory
3✔
510
        m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
3✔
511
        m.estimator = cfg.Estimator
3✔
512

3✔
513
        // Execute the callback function if it is set.
3✔
514
        m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
6✔
515
                f(cfg)
3✔
516
        })
3✔
517

518
        return nil
3✔
519
}
520

521
// ResetHistory resets the history of MissionControl returning it to a state as
522
// if no payment attempts have been made.
523
func (m *MissionControl) ResetHistory() error {
3✔
524
        m.mu.Lock()
3✔
525
        defer m.mu.Unlock()
3✔
526

3✔
527
        if err := m.store.clear(); err != nil {
3✔
528
                return err
×
529
        }
×
530

531
        m.state.resetHistory()
3✔
532

3✔
533
        m.log.Debugf("Mission control history cleared")
3✔
534

3✔
535
        return nil
3✔
536
}
537

538
// GetProbability is expected to return the success probability of a payment
539
// from fromNode along edge.
540
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
541
        amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
3✔
542

3✔
543
        m.mu.Lock()
3✔
544
        defer m.mu.Unlock()
3✔
545

3✔
546
        now := m.cfg.clock.Now()
3✔
547
        results, _ := m.state.getLastPairResult(fromNode)
3✔
548

3✔
549
        // Use a distinct probability estimation function for local channels.
3✔
550
        if fromNode == m.cfg.selfNode {
6✔
551
                return m.estimator.LocalPairProbability(now, results, toNode)
3✔
552
        }
3✔
553

554
        return m.estimator.PairProbability(
3✔
555
                now, results, toNode, amt, capacity,
3✔
556
        )
3✔
557
}
558

559
// GetHistorySnapshot takes a snapshot from the current mission control state
560
// and actual probability estimates.
UNCOV
561
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
×
UNCOV
562
        m.mu.Lock()
×
UNCOV
563
        defer m.mu.Unlock()
×
UNCOV
564

×
UNCOV
565
        m.log.Debugf("Requesting history snapshot from mission control")
×
UNCOV
566

×
UNCOV
567
        return m.state.getSnapshot()
×
UNCOV
568
}
×
569

570
// ImportHistory imports the set of mission control results provided to our
571
// in-memory state. These results are not persisted, so will not survive
572
// restarts.
573
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
574
        force bool) error {
3✔
575

3✔
576
        if history == nil {
3✔
577
                return errors.New("cannot import nil history")
×
578
        }
×
579

580
        m.mu.Lock()
3✔
581
        defer m.mu.Unlock()
3✔
582

3✔
583
        m.log.Infof("Importing history snapshot with %v pairs to mission "+
3✔
584
                "control", len(history.Pairs))
3✔
585

3✔
586
        imported := m.state.importSnapshot(history, force)
3✔
587

3✔
588
        m.log.Infof("Imported %v results to mission control", imported)
3✔
589

3✔
590
        return nil
3✔
591
}
592

593
// GetPairHistorySnapshot returns the stored history for a given node pair.
594
func (m *MissionControl) GetPairHistorySnapshot(
595
        fromNode, toNode route.Vertex) TimedPairResult {
3✔
596

3✔
597
        m.mu.Lock()
3✔
598
        defer m.mu.Unlock()
3✔
599

3✔
600
        results, ok := m.state.getLastPairResult(fromNode)
3✔
601
        if !ok {
6✔
602
                return TimedPairResult{}
3✔
603
        }
3✔
604

605
        result, ok := results[toNode]
3✔
606
        if !ok {
3✔
607
                return TimedPairResult{}
×
608
        }
×
609

610
        return result
3✔
611
}
612

613
// ReportPaymentFail reports a failed payment to mission control as input for
614
// future probability estimates. The failureSourceIdx argument indicates the
615
// failure source. If it is nil, the failure source is unknown. This function
616
// returns a reason if this failure is a final failure. In that case no further
617
// payment attempts need to be made.
618
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
619
        failureSourceIdx *int, failure lnwire.FailureMessage) (
620
        *channeldb.FailureReason, error) {
3✔
621

3✔
622
        timestamp := m.cfg.clock.Now()
3✔
623

3✔
624
        result := newPaymentResult(
3✔
625
                paymentID, extractMCRoute(rt), timestamp, timestamp,
3✔
626
                fn.Some(newPaymentFailure(failureSourceIdx, failure)),
3✔
627
        )
3✔
628

3✔
629
        return m.processPaymentResult(result)
3✔
630
}
3✔
631

632
// ReportPaymentSuccess reports a successful payment to mission control as input
633
// for future probability estimates.
634
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
635
        rt *route.Route) error {
3✔
636

3✔
637
        timestamp := m.cfg.clock.Now()
3✔
638

3✔
639
        result := newPaymentResult(
3✔
640
                paymentID, extractMCRoute(rt), timestamp, timestamp,
3✔
641
                fn.None[paymentFailure](),
3✔
642
        )
3✔
643

3✔
644
        _, err := m.processPaymentResult(result)
3✔
645

3✔
646
        return err
3✔
647
}
3✔
648

649
// processPaymentResult stores a payment result in the mission control store and
650
// updates mission control's in-memory state.
651
func (m *MissionControl) processPaymentResult(result *paymentResult) (
652
        *channeldb.FailureReason, error) {
3✔
653

3✔
654
        // Store complete result in database.
3✔
655
        m.store.AddResult(result)
3✔
656

3✔
657
        m.mu.Lock()
3✔
658
        defer m.mu.Unlock()
3✔
659

3✔
660
        // Apply result to update mission control state.
3✔
661
        reason := m.applyPaymentResult(result)
3✔
662

3✔
663
        return reason, nil
3✔
664
}
3✔
665

666
// applyPaymentResult applies a payment result as input for future probability
667
// estimates. It returns a bool indicating whether this error is a final error
668
// and no further payment attempts need to be made.
669
func (m *MissionControl) applyPaymentResult(
670
        result *paymentResult) *channeldb.FailureReason {
3✔
671

3✔
672
        // Interpret result.
3✔
673
        i := interpretResult(&result.route.Val, result.failure.ValOpt())
3✔
674

3✔
675
        if i.policyFailure != nil {
6✔
676
                if m.state.requestSecondChance(
3✔
677
                        time.Unix(0, int64(result.timeReply.Val)),
3✔
678
                        i.policyFailure.From, i.policyFailure.To,
3✔
679
                ) {
6✔
680
                        return nil
3✔
681
                }
3✔
682
        }
683

684
        // If there is a node-level failure, record a failure for every tried
685
        // connection of that node. A node-level failure can be considered as a
686
        // failure that would have occurred with any of the node's channels.
687
        //
688
        // Ideally we'd also record the failure for the untried connections of
689
        // the node. Unfortunately this would require access to the graph and
690
        // adding this dependency and db calls does not outweigh the benefits.
691
        //
692
        // Untried connections will fall back to the node probability. After the
693
        // call to setAllPairResult below, the node probability will be equal to
694
        // the probability of the tried channels except that the a priori
695
        // probability is mixed in too. This effect is controlled by the
696
        // aprioriWeight parameter. If that parameter isn't set to an extreme
697
        // and there are a few known connections, there shouldn't be much of a
698
        // difference. The largest difference occurs when aprioriWeight is 1. In
699
        // that case, a node-level failure would not be applied to untried
700
        // channels.
701
        if i.nodeFailure != nil {
3✔
UNCOV
702
                m.log.Debugf("Reporting node failure to Mission Control: "+
×
UNCOV
703
                        "node=%v", *i.nodeFailure)
×
UNCOV
704

×
UNCOV
705
                m.state.setAllFail(
×
UNCOV
706
                        *i.nodeFailure,
×
UNCOV
707
                        time.Unix(0, int64(result.timeReply.Val)),
×
UNCOV
708
                )
×
UNCOV
709
        }
×
710

711
        for pair, pairResult := range i.pairResults {
6✔
712
                pairResult := pairResult
3✔
713

3✔
714
                if pairResult.success {
6✔
715
                        m.log.Debugf("Reporting pair success to Mission "+
3✔
716
                                "Control: pair=%v, amt=%v",
3✔
717
                                pair, pairResult.amt)
3✔
718
                } else {
6✔
719
                        m.log.Debugf("Reporting pair failure to Mission "+
3✔
720
                                "Control: pair=%v, amt=%v",
3✔
721
                                pair, pairResult.amt)
3✔
722
                }
3✔
723

724
                m.state.setLastPairResult(
3✔
725
                        pair.From, pair.To,
3✔
726
                        time.Unix(0, int64(result.timeReply.Val)), &pairResult,
3✔
727
                        false,
3✔
728
                )
3✔
729
        }
730

731
        return i.finalFailureReason
3✔
732
}
733

734
// namespacedDB is an implementation of the missionControlDB that gives a user
735
// of the interface access to a namespaced bucket within the top level mission
736
// control bucket.
737
type namespacedDB struct {
738
        topLevelBucketKey []byte
739
        namespace         []byte
740
        db                kvdb.Backend
741
}
742

743
// A compile-time check to ensure that namespacedDB implements missionControlDB.
744
var _ missionControlDB = (*namespacedDB)(nil)
745

746
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
747
// default namespace.
UNCOV
748
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
×
UNCOV
749
        return newNamespacedDB(db, DefaultMissionControlNamespace)
×
UNCOV
750
}
×
751

752
// newNamespacedDB creates a new instance of missionControlDB where the DB will
753
// have access to a namespaced bucket within the top level mission control
754
// bucket.
755
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
3✔
756
        return &namespacedDB{
3✔
757
                db:                db,
3✔
758
                namespace:         []byte(namespace),
3✔
759
                topLevelBucketKey: resultsKey,
3✔
760
        }
3✔
761
}
3✔
762

763
// update can be used to perform reads and writes on the given bucket.
764
//
765
// NOTE: this is part of the missionControlDB interface.
766
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
767
        reset func()) error {
3✔
768

3✔
769
        return n.db.Update(func(tx kvdb.RwTx) error {
6✔
770
                mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
3✔
771
                if err != nil {
3✔
772
                        return fmt.Errorf("cannot create top level mission "+
×
773
                                "control bucket: %w", err)
×
774
                }
×
775

776
                namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
3✔
777
                        n.namespace,
3✔
778
                )
3✔
779
                if err != nil {
3✔
780
                        return fmt.Errorf("cannot create namespaced bucket "+
×
781
                                "(%s) in mission control store: %w",
×
782
                                n.namespace, err)
×
783
                }
×
784

785
                return f(namespacedBkt)
3✔
786
        }, reset)
787
}
788

789
// view can be used to perform reads on the given bucket.
790
//
791
// NOTE: this is part of the missionControlDB interface.
792
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
793
        reset func()) error {
3✔
794

3✔
795
        return n.db.View(func(tx kvdb.RTx) error {
6✔
796
                mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
3✔
797
                if mcStoreBkt == nil {
3✔
798
                        return fmt.Errorf("top level mission control bucket " +
×
799
                                "not found")
×
800
                }
×
801

802
                namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
3✔
803
                if namespacedBkt == nil {
3✔
804
                        return fmt.Errorf("namespaced bucket (%s) not found "+
×
805
                                "in mission control store", n.namespace)
×
806
                }
×
807

808
                return f(namespacedBkt)
3✔
809
        }, reset)
810
}
811

812
// purge will delete all the contents in the namespace.
813
//
814
// NOTE: this is part of the missionControlDB interface.
815
func (n *namespacedDB) purge() error {
3✔
816
        return n.db.Update(func(tx kvdb.RwTx) error {
6✔
817
                mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
3✔
818
                if mcStoreBkt == nil {
3✔
819
                        return nil
×
820
                }
×
821

822
                err := mcStoreBkt.DeleteNestedBucket(n.namespace)
3✔
823
                if err != nil {
3✔
824
                        return err
×
825
                }
×
826

827
                _, err = mcStoreBkt.CreateBucket(n.namespace)
3✔
828

3✔
829
                return err
3✔
830
        }, func() {})
3✔
831
}
832

833
// newPaymentFailure constructs a new paymentFailure struct. If the source
834
// index is nil, then an empty paymentFailure is returned. This represents a
835
// failure with unknown details. Otherwise, the index and failure message are
836
// used to populate the info field of the paymentFailure.
837
func newPaymentFailure(sourceIdx *int,
838
        failureMsg lnwire.FailureMessage) paymentFailure {
3✔
839

3✔
840
        // If we can't identify a failure source, we also won't have a decrypted
3✔
841
        // failure message. In this case we return an empty payment failure.
3✔
842
        if sourceIdx == nil {
3✔
UNCOV
843
                return paymentFailure{}
×
UNCOV
844
        }
×
845

846
        info := paymentFailure{
3✔
847
                sourceIdx: tlv.SomeRecordT(
3✔
848
                        tlv.NewPrimitiveRecord[tlv.TlvType0](
3✔
849
                                uint8(*sourceIdx),
3✔
850
                        ),
3✔
851
                ),
3✔
852
        }
3✔
853

3✔
854
        if failureMsg != nil {
6✔
855
                info.msg = tlv.SomeRecordT(
3✔
856
                        tlv.NewRecordT[tlv.TlvType1](
3✔
857
                                failureMessage{failureMsg},
3✔
858
                        ),
3✔
859
                )
3✔
860
        }
3✔
861

862
        return info
3✔
863
}
864

865
// paymentFailure holds additional information about a payment failure.
866
type paymentFailure struct {
867
        // sourceIdx is the hop the error was reported from. In order to be able
868
        // to decrypt the error message, we need to know the source, which is
869
        // why an error message can only be present if the source is known.
870
        sourceIdx tlv.OptionalRecordT[tlv.TlvType0, uint8]
871

872
        // msg is the error why a payment failed. If we identify the failure of
873
        // a certain hop at the above index, but aren't able to decode the
874
        // failure message we indicate this by not setting this field.
875
        msg tlv.OptionalRecordT[tlv.TlvType1, failureMessage]
876
}
877

878
// Record returns a TLV record that can be used to encode/decode a
879
// paymentFailure to/from a TLV stream.
880
func (r *paymentFailure) Record() tlv.Record {
3✔
881
        recordSize := func() uint64 {
6✔
882
                var (
3✔
883
                        b   bytes.Buffer
3✔
884
                        buf [8]byte
3✔
885
                )
3✔
886
                if err := encodePaymentFailure(&b, r, &buf); err != nil {
3✔
887
                        panic(err)
×
888
                }
889

890
                return uint64(len(b.Bytes()))
3✔
891
        }
892

893
        return tlv.MakeDynamicRecord(
3✔
894
                0, r, recordSize, encodePaymentFailure,
3✔
895
                decodePaymentFailure,
3✔
896
        )
3✔
897
}
898

899
func encodePaymentFailure(w io.Writer, val interface{}, _ *[8]byte) error {
3✔
900
        if v, ok := val.(*paymentFailure); ok {
6✔
901
                var recordProducers []tlv.RecordProducer
3✔
902

3✔
903
                v.sourceIdx.WhenSome(
3✔
904
                        func(r tlv.RecordT[tlv.TlvType0, uint8]) {
6✔
905
                                recordProducers = append(
3✔
906
                                        recordProducers, &r,
3✔
907
                                )
3✔
908
                        },
3✔
909
                )
910

911
                v.msg.WhenSome(
3✔
912
                        func(r tlv.RecordT[tlv.TlvType1, failureMessage]) {
6✔
913
                                recordProducers = append(
3✔
914
                                        recordProducers, &r,
3✔
915
                                )
3✔
916
                        },
3✔
917
                )
918

919
                return lnwire.EncodeRecordsTo(
3✔
920
                        w, lnwire.ProduceRecordsSorted(
3✔
921
                                recordProducers...,
3✔
922
                        ),
3✔
923
                )
3✔
924
        }
925

926
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailure")
×
927
}
928

929
func decodePaymentFailure(r io.Reader, val interface{}, _ *[8]byte,
930
        l uint64) error {
3✔
931

3✔
932
        if v, ok := val.(*paymentFailure); ok {
6✔
933
                var h paymentFailure
3✔
934

3✔
935
                sourceIdx := tlv.ZeroRecordT[tlv.TlvType0, uint8]()
3✔
936
                msg := tlv.ZeroRecordT[tlv.TlvType1, failureMessage]()
3✔
937

3✔
938
                typeMap, err := lnwire.DecodeRecords(
3✔
939
                        r,
3✔
940
                        lnwire.ProduceRecordsSorted(&sourceIdx, &msg)...,
3✔
941
                )
3✔
942

3✔
943
                if err != nil {
3✔
944
                        return err
×
945
                }
×
946

947
                if _, ok := typeMap[h.sourceIdx.TlvType()]; ok {
6✔
948
                        h.sourceIdx = tlv.SomeRecordT(sourceIdx)
3✔
949
                }
3✔
950

951
                if _, ok := typeMap[h.msg.TlvType()]; ok {
6✔
952
                        h.msg = tlv.SomeRecordT(msg)
3✔
953
                }
3✔
954

955
                *v = h
3✔
956

3✔
957
                return nil
3✔
958
        }
959

960
        return tlv.NewTypeForDecodingErr(
×
961
                val, "routing.paymentFailure", l, l,
×
962
        )
×
963
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc