• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12012751795

25 Nov 2024 02:40PM UTC coverage: 49.835% (-9.2%) from 59.013%
12012751795

Pull #9303

github

yyforyongyu
lnwallet: add debug logs
Pull Request #9303: htlcswitch+routing: handle nil pointer dereference properly

20 of 23 new or added lines in 4 files covered. (86.96%)

25467 existing lines in 425 files now uncovered.

99835 of 200331 relevant lines covered (49.84%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.52
/routing/missioncontrol.go
1
package routing
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/build"
15
        "github.com/lightningnetwork/lnd/channeldb"
16
        "github.com/lightningnetwork/lnd/clock"
17
        "github.com/lightningnetwork/lnd/fn"
18
        "github.com/lightningnetwork/lnd/kvdb"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/routing/route"
21
        "github.com/lightningnetwork/lnd/tlv"
22
)
23

24
const (
25
        // DefaultPenaltyHalfLife is the default half-life duration. The
26
        // half-life duration defines after how much time a penalized node or
27
        // channel is back at 50% probability.
28
        DefaultPenaltyHalfLife = time.Hour
29

30
        // minSecondChanceInterval is the minimum time required between
31
        // second-chance failures.
32
        //
33
        // If nodes return a channel policy related failure, they may get a
34
        // second chance to forward the payment. It could be that the channel
35
        // policy that we are aware of is not up to date. This is especially
36
        // important in case of mobile apps that are mostly offline.
37
        //
38
        // However, we don't want to give nodes the option to endlessly return
39
        // new channel updates so that we are kept busy trying to route through
40
        // that node until the payment loop times out.
41
        //
42
        // Therefore we only grant a second chance to a node if the previous
43
        // second chance is sufficiently long ago. This is what
44
        // minSecondChanceInterval defines. If a second policy failure comes in
45
        // within that interval, we will apply a penalty.
46
        //
47
        // Second chances granted are tracked on the level of node pairs. This
48
        // means that if a node has multiple channels to the same peer, they
49
        // will only get a single second chance to route to that peer again.
50
        // Nodes forward non-strict, so it isn't necessary to apply a less
51
        // restrictive channel level tracking scheme here.
52
        minSecondChanceInterval = time.Minute
53

54
        // DefaultMaxMcHistory is the default maximum history size.
55
        DefaultMaxMcHistory = 1000
56

57
        // DefaultMcFlushInterval is the default interval we use to flush MC state
58
        // to the database.
59
        DefaultMcFlushInterval = time.Second
60

61
        // prevSuccessProbability is the assumed probability for node pairs that
62
        // successfully relayed the previous attempt.
63
        prevSuccessProbability = 0.95
64

65
        // DefaultAprioriWeight is the default a priori weight. See
66
        // MissionControlConfig for further explanation.
67
        DefaultAprioriWeight = 0.5
68

69
        // DefaultMinFailureRelaxInterval is the default minimum time that must
70
        // have passed since the previously recorded failure before the failure
71
        // amount may be raised.
72
        DefaultMinFailureRelaxInterval = time.Minute
73

74
        // DefaultFeeEstimationTimeout is the default value for
75
        // FeeEstimationTimeout. It defines the maximum duration that the
76
        // probing fee estimation is allowed to take.
77
        DefaultFeeEstimationTimeout = time.Minute
78

79
        // DefaultMissionControlNamespace is the name of the default mission
80
        // control name space. This is used as the sub-bucket key within the
81
        // top level DB bucket to store mission control results.
82
        DefaultMissionControlNamespace = "default"
83
)
84

85
var (
86
        // ErrInvalidMcHistory is returned if we get a negative mission control
87
        // history count.
88
        ErrInvalidMcHistory = errors.New("mission control history must be " +
89
                ">= 0")
90

91
        // ErrInvalidFailureInterval is returned if we get an invalid failure
92
        // interval.
93
        ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
94
)
95

96
// NodeResults contains previous results from a node to its peers.
97
type NodeResults map[route.Vertex]TimedPairResult
98

99
// mcConfig holds various config members that will be required by all
100
// MissionControl instances and will be the same regardless of namespace.
101
type mcConfig struct {
102
        // clock is a time source used by mission control.
103
        clock clock.Clock
104

105
        // selfNode is our pubkey.
106
        selfNode route.Vertex
107
}
108

109
// MissionControl contains state which summarizes the past attempts of HTLC
110
// routing by external callers when sending payments throughout the network. It
111
// acts as a shared memory during routing attempts with the goal to optimize the
112
// payment attempt success rate.
113
//
114
// Failed payment attempts are reported to mission control. These reports are
115
// used to track the time of the last node or channel level failure. The time
116
// since the last failure is used to estimate a success probability that is fed
117
// into the path finding process for subsequent payment attempts.
118
type MissionControl struct {
119
        cfg *mcConfig
120

121
        // state is the internal mission control state that is input for
122
        // probability estimation.
123
        state *missionControlState
124

125
        store *missionControlStore
126

127
        // estimator is the probability estimator that is used with the payment
128
        // results that mission control collects.
129
        estimator Estimator
130

131
        // onConfigUpdate is a function that is called whenever the
132
        // mission control state is updated.
133
        onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
134

135
        log btclog.Logger
136

137
        mu sync.Mutex
138
}
139

140
// MissionController manages MissionControl instances in various namespaces.
141
type MissionController struct {
142
        db           kvdb.Backend
143
        cfg          *mcConfig
144
        defaultMCCfg *MissionControlConfig
145

146
        mc map[string]*MissionControl
147
        mu sync.Mutex
148

149
        // TODO(roasbeef): further counters, if vertex continually unavailable,
150
        // add to another generation
151

152
        // TODO(roasbeef): also add favorable metrics for nodes
153
}
154

155
// GetNamespacedStore returns the MissionControl in the given namespace. If one
156
// does not yet exist, then it is initialised.
157
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
158
        error) {
4✔
159

4✔
160
        m.mu.Lock()
4✔
161
        defer m.mu.Unlock()
4✔
162

4✔
163
        if mc, ok := m.mc[ns]; ok {
8✔
164
                return mc, nil
4✔
165
        }
4✔
166

UNCOV
167
        return m.initMissionControl(ns)
×
168
}
169

170
// ListNamespaces returns a list of the namespaces that the MissionController
171
// is aware of.
UNCOV
172
func (m *MissionController) ListNamespaces() []string {
×
UNCOV
173
        m.mu.Lock()
×
UNCOV
174
        defer m.mu.Unlock()
×
UNCOV
175

×
UNCOV
176
        namespaces := make([]string, 0, len(m.mc))
×
UNCOV
177
        for ns := range m.mc {
×
UNCOV
178
                namespaces = append(namespaces, ns)
×
UNCOV
179
        }
×
180

UNCOV
181
        return namespaces
×
182
}
183

184
// MissionControlConfig defines parameters that control mission control
185
// behaviour.
186
type MissionControlConfig struct {
187
        // Estimator gives probability estimates for node pairs.
188
        Estimator Estimator
189

190
        // OnConfigUpdate is function that is called whenever the
191
        // mission control state is updated.
192
        OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
193

194
        // MaxMcHistory defines the maximum number of payment results that are
195
        // held on disk.
196
        MaxMcHistory int
197

198
        // McFlushInterval defines the ticker interval when we flush the
199
        // accumulated state to the DB.
200
        McFlushInterval time.Duration
201

202
        // MinFailureRelaxInterval is the minimum time that must have passed
203
        // since the previously recorded failure before the failure amount may
204
        // be raised.
205
        MinFailureRelaxInterval time.Duration
206
}
207

208
func (c *MissionControlConfig) validate() error {
4✔
209
        if c.MaxMcHistory < 0 {
4✔
210
                return ErrInvalidMcHistory
×
211
        }
×
212

213
        if c.MinFailureRelaxInterval < 0 {
4✔
214
                return ErrInvalidFailureInterval
×
215
        }
×
216

217
        return nil
4✔
218
}
219

220
// String returns a string representation of a mission control config.
221
func (c *MissionControlConfig) String() string {
4✔
222
        return fmt.Sprintf("maximum history: %v, minimum failure relax "+
4✔
223
                "interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
4✔
224
}
4✔
225

226
// TimedPairResult describes a timestamped pair result.
227
type TimedPairResult struct {
228
        // FailTime is the time of the last failure.
229
        FailTime time.Time
230

231
        // FailAmt is the amount of the last failure. This amount may be pushed
232
        // up if a later success is higher than the last failed amount.
233
        FailAmt lnwire.MilliSatoshi
234

235
        // SuccessTime is the time of the last success.
236
        SuccessTime time.Time
237

238
        // SuccessAmt is the highest amount that successfully forwarded. This
239
        // isn't necessarily the last success amount. The value of this field
240
        // may also be pushed down if a later failure is lower than the highest
241
        // success amount. Because of this, SuccessAmt may not match
242
        // SuccessTime.
243
        SuccessAmt lnwire.MilliSatoshi
244
}
245

246
// MissionControlSnapshot contains a snapshot of the current state of mission
247
// control.
248
type MissionControlSnapshot struct {
249
        // Pairs is a list of channels for which specific information is
250
        // logged.
251
        Pairs []MissionControlPairSnapshot
252
}
253

254
// MissionControlPairSnapshot contains a snapshot of the current node pair
255
// state in mission control.
256
type MissionControlPairSnapshot struct {
257
        // Pair is the node pair of which the state is described.
258
        Pair DirectedNodePair
259

260
        // TimedPairResult contains the data for this pair.
261
        TimedPairResult
262
}
263

264
// paymentResult is the information that becomes available when a payment
265
// attempt completes.
266
type paymentResult struct {
267
        id        uint64
268
        timeFwd   tlv.RecordT[tlv.TlvType0, uint64]
269
        timeReply tlv.RecordT[tlv.TlvType1, uint64]
270
        route     tlv.RecordT[tlv.TlvType2, mcRoute]
271

272
        // failure holds information related to the failure of a payment. The
273
        // presence of this record indicates a payment failure. The absence of
274
        // this record indicates a successful payment.
275
        failure tlv.OptionalRecordT[tlv.TlvType3, paymentFailure]
276
}
277

278
// newPaymentResult constructs a new paymentResult.
279
func newPaymentResult(id uint64, rt *mcRoute, timeFwd, timeReply time.Time,
280
        failure *paymentFailure) *paymentResult {
4✔
281

4✔
282
        result := &paymentResult{
4✔
283
                id: id,
4✔
284
                timeFwd: tlv.NewPrimitiveRecord[tlv.TlvType0](
4✔
285
                        uint64(timeFwd.UnixNano()),
4✔
286
                ),
4✔
287
                timeReply: tlv.NewPrimitiveRecord[tlv.TlvType1](
4✔
288
                        uint64(timeReply.UnixNano()),
4✔
289
                ),
4✔
290
                route: tlv.NewRecordT[tlv.TlvType2](*rt),
4✔
291
        }
4✔
292

4✔
293
        if failure != nil {
8✔
294
                result.failure = tlv.SomeRecordT(
4✔
295
                        tlv.NewRecordT[tlv.TlvType3](*failure),
4✔
296
                )
4✔
297
        }
4✔
298

299
        return result
4✔
300
}
301

302
// NewMissionController returns a new instance of MissionController.
303
func NewMissionController(db kvdb.Backend, self route.Vertex,
304
        cfg *MissionControlConfig) (*MissionController, error) {
4✔
305

4✔
306
        log.Debugf("Instantiating mission control with config: %v, %v", cfg,
4✔
307
                cfg.Estimator)
4✔
308

4✔
309
        if err := cfg.validate(); err != nil {
4✔
310
                return nil, err
×
311
        }
×
312

313
        mcCfg := &mcConfig{
4✔
314
                clock:    clock.NewDefaultClock(),
4✔
315
                selfNode: self,
4✔
316
        }
4✔
317

4✔
318
        mgr := &MissionController{
4✔
319
                db:           db,
4✔
320
                defaultMCCfg: cfg,
4✔
321
                cfg:          mcCfg,
4✔
322
                mc:           make(map[string]*MissionControl),
4✔
323
        }
4✔
324

4✔
325
        if err := mgr.loadMissionControls(); err != nil {
4✔
326
                return nil, err
×
327
        }
×
328

329
        for _, mc := range mgr.mc {
8✔
330
                if err := mc.init(); err != nil {
4✔
331
                        return nil, err
×
332
                }
×
333
        }
334

335
        return mgr, nil
4✔
336
}
337

338
// loadMissionControls initialises a MissionControl in the default namespace if
339
// one does not yet exist. It then initialises a MissionControl for all other
340
// namespaces found in the DB.
341
//
342
// NOTE: this should only be called once during MissionController construction.
343
func (m *MissionController) loadMissionControls() error {
4✔
344
        m.mu.Lock()
4✔
345
        defer m.mu.Unlock()
4✔
346

4✔
347
        // Always initialise the default namespace.
4✔
348
        _, err := m.initMissionControl(DefaultMissionControlNamespace)
4✔
349
        if err != nil {
4✔
350
                return err
×
351
        }
×
352

353
        namespaces := make(map[string]struct{})
4✔
354
        err = m.db.View(func(tx walletdb.ReadTx) error {
8✔
355
                mcStoreBkt := tx.ReadBucket(resultsKey)
4✔
356
                if mcStoreBkt == nil {
4✔
357
                        return fmt.Errorf("top level mission control bucket " +
×
358
                                "not found")
×
359
                }
×
360

361
                // Iterate through all the keys in the bucket and collect the
362
                // namespaces.
363
                return mcStoreBkt.ForEach(func(k, _ []byte) error {
8✔
364
                        // We've already initialised the default namespace so
4✔
365
                        // we can skip it.
4✔
366
                        if string(k) == DefaultMissionControlNamespace {
8✔
367
                                return nil
4✔
368
                        }
4✔
369

UNCOV
370
                        namespaces[string(k)] = struct{}{}
×
UNCOV
371

×
UNCOV
372
                        return nil
×
373
                })
374
        }, func() {})
4✔
375
        if err != nil {
4✔
376
                return err
×
377
        }
×
378

379
        // Now, iterate through all the namespaces and initialise them.
380
        for ns := range namespaces {
4✔
UNCOV
381
                _, err = m.initMissionControl(ns)
×
UNCOV
382
                if err != nil {
×
383
                        return err
×
384
                }
×
385
        }
386

387
        return nil
4✔
388
}
389

390
// initMissionControl creates a new MissionControl instance with the given
391
// namespace if one does not yet exist.
392
//
393
// NOTE: the MissionController's mutex must be held before calling this method.
394
func (m *MissionController) initMissionControl(namespace string) (
395
        *MissionControl, error) {
4✔
396

4✔
397
        // If a mission control with this namespace has already been initialised
4✔
398
        // then there is nothing left to do.
4✔
399
        if mc, ok := m.mc[namespace]; ok {
4✔
400
                return mc, nil
×
401
        }
×
402

403
        cfg := m.defaultMCCfg
4✔
404

4✔
405
        store, err := newMissionControlStore(
4✔
406
                newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
4✔
407
                cfg.McFlushInterval,
4✔
408
        )
4✔
409
        if err != nil {
4✔
410
                return nil, err
×
411
        }
×
412

413
        mc := &MissionControl{
4✔
414
                cfg:       m.cfg,
4✔
415
                state:     newMissionControlState(cfg.MinFailureRelaxInterval),
4✔
416
                store:     store,
4✔
417
                estimator: cfg.Estimator,
4✔
418
                log: build.NewPrefixLog(
4✔
419
                        fmt.Sprintf("[%s]:", namespace), log,
4✔
420
                ),
4✔
421
                onConfigUpdate: cfg.OnConfigUpdate,
4✔
422
        }
4✔
423

4✔
424
        m.mc[namespace] = mc
4✔
425

4✔
426
        return mc, nil
4✔
427
}
428

429
// RunStoreTickers runs the mission controller store's tickers.
430
func (m *MissionController) RunStoreTickers() {
4✔
431
        m.mu.Lock()
4✔
432
        defer m.mu.Unlock()
4✔
433

4✔
434
        for _, mc := range m.mc {
8✔
435
                mc.store.run()
4✔
436
        }
4✔
437
}
438

439
// StopStoreTickers stops the mission control store's tickers.
440
func (m *MissionController) StopStoreTickers() {
4✔
441
        log.Debug("Stopping mission control store ticker")
4✔
442
        defer log.Debug("Mission control store ticker stopped")
4✔
443

4✔
444
        m.mu.Lock()
4✔
445
        defer m.mu.Unlock()
4✔
446

4✔
447
        for _, mc := range m.mc {
8✔
448
                mc.store.stop()
4✔
449
        }
4✔
450
}
451

452
// init initializes mission control with historical data.
453
func (m *MissionControl) init() error {
4✔
454
        m.log.Debugf("Mission control state reconstruction started")
4✔
455

4✔
456
        m.mu.Lock()
4✔
457
        defer m.mu.Unlock()
4✔
458

4✔
459
        start := time.Now()
4✔
460

4✔
461
        results, err := m.store.fetchAll()
4✔
462
        if err != nil {
4✔
463
                return err
×
464
        }
×
465

466
        for _, result := range results {
8✔
467
                _ = m.applyPaymentResult(result)
4✔
468
        }
4✔
469

470
        m.log.Debugf("Mission control state reconstruction finished: "+
4✔
471
                "n=%v, time=%v", len(results), time.Since(start))
4✔
472

4✔
473
        return nil
4✔
474
}
475

476
// GetConfig returns the config that mission control is currently configured
477
// with. All fields are copied by value, so we do not need to worry about
478
// mutation.
479
func (m *MissionControl) GetConfig() *MissionControlConfig {
4✔
480
        m.mu.Lock()
4✔
481
        defer m.mu.Unlock()
4✔
482

4✔
483
        return &MissionControlConfig{
4✔
484
                Estimator:               m.estimator,
4✔
485
                MaxMcHistory:            m.store.maxRecords,
4✔
486
                McFlushInterval:         m.store.flushInterval,
4✔
487
                MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
4✔
488
        }
4✔
489
}
4✔
490

491
// SetConfig validates the config provided and updates mission control's config
492
// if it is valid.
493
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
4✔
494
        if cfg == nil {
4✔
495
                return errors.New("nil mission control config")
×
496
        }
×
497

498
        if err := cfg.validate(); err != nil {
4✔
499
                return err
×
500
        }
×
501

502
        m.mu.Lock()
4✔
503
        defer m.mu.Unlock()
4✔
504

4✔
505
        m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
4✔
506
                cfg.Estimator)
4✔
507

4✔
508
        m.store.maxRecords = cfg.MaxMcHistory
4✔
509
        m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
4✔
510
        m.estimator = cfg.Estimator
4✔
511

4✔
512
        // Execute the callback function if it is set.
4✔
513
        m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
8✔
514
                f(cfg)
4✔
515
        })
4✔
516

517
        return nil
4✔
518
}
519

520
// ResetHistory resets the history of MissionControl returning it to a state as
521
// if no payment attempts have been made.
522
func (m *MissionControl) ResetHistory() error {
4✔
523
        m.mu.Lock()
4✔
524
        defer m.mu.Unlock()
4✔
525

4✔
526
        if err := m.store.clear(); err != nil {
4✔
527
                return err
×
528
        }
×
529

530
        m.state.resetHistory()
4✔
531

4✔
532
        m.log.Debugf("Mission control history cleared")
4✔
533

4✔
534
        return nil
4✔
535
}
536

537
// GetProbability is expected to return the success probability of a payment
538
// from fromNode along edge.
539
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
540
        amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
4✔
541

4✔
542
        m.mu.Lock()
4✔
543
        defer m.mu.Unlock()
4✔
544

4✔
545
        now := m.cfg.clock.Now()
4✔
546
        results, _ := m.state.getLastPairResult(fromNode)
4✔
547

4✔
548
        // Use a distinct probability estimation function for local channels.
4✔
549
        if fromNode == m.cfg.selfNode {
8✔
550
                return m.estimator.LocalPairProbability(now, results, toNode)
4✔
551
        }
4✔
552

553
        return m.estimator.PairProbability(
4✔
554
                now, results, toNode, amt, capacity,
4✔
555
        )
4✔
556
}
557

558
// GetHistorySnapshot takes a snapshot from the current mission control state
559
// and actual probability estimates.
UNCOV
560
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
×
UNCOV
561
        m.mu.Lock()
×
UNCOV
562
        defer m.mu.Unlock()
×
UNCOV
563

×
UNCOV
564
        m.log.Debugf("Requesting history snapshot from mission control")
×
UNCOV
565

×
UNCOV
566
        return m.state.getSnapshot()
×
UNCOV
567
}
×
568

569
// ImportHistory imports the set of mission control results provided to our
570
// in-memory state. These results are not persisted, so will not survive
571
// restarts.
572
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
573
        force bool) error {
4✔
574

4✔
575
        if history == nil {
4✔
576
                return errors.New("cannot import nil history")
×
577
        }
×
578

579
        m.mu.Lock()
4✔
580
        defer m.mu.Unlock()
4✔
581

4✔
582
        m.log.Infof("Importing history snapshot with %v pairs to mission "+
4✔
583
                "control", len(history.Pairs))
4✔
584

4✔
585
        imported := m.state.importSnapshot(history, force)
4✔
586

4✔
587
        m.log.Infof("Imported %v results to mission control", imported)
4✔
588

4✔
589
        return nil
4✔
590
}
591

592
// GetPairHistorySnapshot returns the stored history for a given node pair.
593
func (m *MissionControl) GetPairHistorySnapshot(
594
        fromNode, toNode route.Vertex) TimedPairResult {
4✔
595

4✔
596
        m.mu.Lock()
4✔
597
        defer m.mu.Unlock()
4✔
598

4✔
599
        results, ok := m.state.getLastPairResult(fromNode)
4✔
600
        if !ok {
8✔
601
                return TimedPairResult{}
4✔
602
        }
4✔
603

604
        result, ok := results[toNode]
4✔
605
        if !ok {
8✔
606
                return TimedPairResult{}
4✔
607
        }
4✔
608

609
        return result
4✔
610
}
611

612
// ReportPaymentFail reports a failed payment to mission control as input for
613
// future probability estimates. The failureSourceIdx argument indicates the
614
// failure source. If it is nil, the failure source is unknown. This function
615
// returns a reason if this failure is a final failure. In that case no further
616
// payment attempts need to be made.
617
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
618
        failureSourceIdx *int, failure lnwire.FailureMessage) (
619
        *channeldb.FailureReason, error) {
4✔
620

4✔
621
        timestamp := m.cfg.clock.Now()
4✔
622

4✔
623
        result := newPaymentResult(
4✔
624
                paymentID, extractMCRoute(rt), timestamp, timestamp,
4✔
625
                newPaymentFailure(failureSourceIdx, failure),
4✔
626
        )
4✔
627

4✔
628
        return m.processPaymentResult(result)
4✔
629
}
4✔
630

631
// ReportPaymentSuccess reports a successful payment to mission control as input
632
// for future probability estimates.
633
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
634
        rt *route.Route) error {
4✔
635

4✔
636
        timestamp := m.cfg.clock.Now()
4✔
637

4✔
638
        result := newPaymentResult(
4✔
639
                paymentID, extractMCRoute(rt), timestamp, timestamp, nil,
4✔
640
        )
4✔
641

4✔
642
        _, err := m.processPaymentResult(result)
4✔
643

4✔
644
        return err
4✔
645
}
4✔
646

647
// processPaymentResult stores a payment result in the mission control store and
648
// updates mission control's in-memory state.
649
func (m *MissionControl) processPaymentResult(result *paymentResult) (
650
        *channeldb.FailureReason, error) {
4✔
651

4✔
652
        // Store complete result in database.
4✔
653
        m.store.AddResult(result)
4✔
654

4✔
655
        m.mu.Lock()
4✔
656
        defer m.mu.Unlock()
4✔
657

4✔
658
        // Apply result to update mission control state.
4✔
659
        reason := m.applyPaymentResult(result)
4✔
660

4✔
661
        return reason, nil
4✔
662
}
4✔
663

664
// applyPaymentResult applies a payment result as input for future probability
665
// estimates. It returns a bool indicating whether this error is a final error
666
// and no further payment attempts need to be made.
667
func (m *MissionControl) applyPaymentResult(
668
        result *paymentResult) *channeldb.FailureReason {
4✔
669

4✔
670
        // Interpret result.
4✔
671
        i := interpretResult(&result.route.Val, result.failure.ValOpt())
4✔
672

4✔
673
        if i.policyFailure != nil {
8✔
674
                if m.state.requestSecondChance(
4✔
675
                        time.Unix(0, int64(result.timeReply.Val)),
4✔
676
                        i.policyFailure.From, i.policyFailure.To,
4✔
677
                ) {
8✔
678
                        return nil
4✔
679
                }
4✔
680
        }
681

682
        // If there is a node-level failure, record a failure for every tried
683
        // connection of that node. A node-level failure can be considered as a
684
        // failure that would have occurred with any of the node's channels.
685
        //
686
        // Ideally we'd also record the failure for the untried connections of
687
        // the node. Unfortunately this would require access to the graph and
688
        // adding this dependency and db calls does not outweigh the benefits.
689
        //
690
        // Untried connections will fall back to the node probability. After the
691
        // call to setAllPairResult below, the node probability will be equal to
692
        // the probability of the tried channels except that the a priori
693
        // probability is mixed in too. This effect is controlled by the
694
        // aprioriWeight parameter. If that parameter isn't set to an extreme
695
        // and there are a few known connections, there shouldn't be much of a
696
        // difference. The largest difference occurs when aprioriWeight is 1. In
697
        // that case, a node-level failure would not be applied to untried
698
        // channels.
699
        if i.nodeFailure != nil {
4✔
UNCOV
700
                m.log.Debugf("Reporting node failure to Mission Control: "+
×
UNCOV
701
                        "node=%v", *i.nodeFailure)
×
UNCOV
702

×
UNCOV
703
                m.state.setAllFail(
×
UNCOV
704
                        *i.nodeFailure,
×
UNCOV
705
                        time.Unix(0, int64(result.timeReply.Val)),
×
UNCOV
706
                )
×
UNCOV
707
        }
×
708

709
        for pair, pairResult := range i.pairResults {
8✔
710
                pairResult := pairResult
4✔
711

4✔
712
                if pairResult.success {
8✔
713
                        m.log.Debugf("Reporting pair success to Mission "+
4✔
714
                                "Control: pair=%v, amt=%v",
4✔
715
                                pair, pairResult.amt)
4✔
716
                } else {
8✔
717
                        m.log.Debugf("Reporting pair failure to Mission "+
4✔
718
                                "Control: pair=%v, amt=%v",
4✔
719
                                pair, pairResult.amt)
4✔
720
                }
4✔
721

722
                m.state.setLastPairResult(
4✔
723
                        pair.From, pair.To,
4✔
724
                        time.Unix(0, int64(result.timeReply.Val)), &pairResult,
4✔
725
                        false,
4✔
726
                )
4✔
727
        }
728

729
        return i.finalFailureReason
4✔
730
}
731

732
// namespacedDB is an implementation of the missionControlDB that gives a user
733
// of the interface access to a namespaced bucket within the top level mission
734
// control bucket.
735
type namespacedDB struct {
736
        topLevelBucketKey []byte
737
        namespace         []byte
738
        db                kvdb.Backend
739
}
740

741
// A compile-time check to ensure that namespacedDB implements missionControlDB.
742
var _ missionControlDB = (*namespacedDB)(nil)
743

744
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
745
// default namespace.
UNCOV
746
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
×
UNCOV
747
        return newNamespacedDB(db, DefaultMissionControlNamespace)
×
UNCOV
748
}
×
749

750
// newNamespacedDB creates a new instance of missionControlDB where the DB will
751
// have access to a namespaced bucket within the top level mission control
752
// bucket.
753
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
4✔
754
        return &namespacedDB{
4✔
755
                db:                db,
4✔
756
                namespace:         []byte(namespace),
4✔
757
                topLevelBucketKey: resultsKey,
4✔
758
        }
4✔
759
}
4✔
760

761
// update can be used to perform reads and writes on the given bucket.
762
//
763
// NOTE: this is part of the missionControlDB interface.
764
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
765
        reset func()) error {
4✔
766

4✔
767
        return n.db.Update(func(tx kvdb.RwTx) error {
8✔
768
                mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
4✔
769
                if err != nil {
4✔
770
                        return fmt.Errorf("cannot create top level mission "+
×
771
                                "control bucket: %w", err)
×
772
                }
×
773

774
                namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
4✔
775
                        n.namespace,
4✔
776
                )
4✔
777
                if err != nil {
4✔
778
                        return fmt.Errorf("cannot create namespaced bucket "+
×
779
                                "(%s) in mission control store: %w",
×
780
                                n.namespace, err)
×
781
                }
×
782

783
                return f(namespacedBkt)
4✔
784
        }, reset)
785
}
786

787
// view can be used to perform reads on the given bucket.
788
//
789
// NOTE: this is part of the missionControlDB interface.
790
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
791
        reset func()) error {
4✔
792

4✔
793
        return n.db.View(func(tx kvdb.RTx) error {
8✔
794
                mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
4✔
795
                if mcStoreBkt == nil {
4✔
796
                        return fmt.Errorf("top level mission control bucket " +
×
797
                                "not found")
×
798
                }
×
799

800
                namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
4✔
801
                if namespacedBkt == nil {
4✔
802
                        return fmt.Errorf("namespaced bucket (%s) not found "+
×
803
                                "in mission control store", n.namespace)
×
804
                }
×
805

806
                return f(namespacedBkt)
4✔
807
        }, reset)
808
}
809

810
// purge will delete all the contents in the namespace.
811
//
812
// NOTE: this is part of the missionControlDB interface.
813
func (n *namespacedDB) purge() error {
4✔
814
        return n.db.Update(func(tx kvdb.RwTx) error {
8✔
815
                mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
4✔
816
                if mcStoreBkt == nil {
4✔
817
                        return nil
×
818
                }
×
819

820
                err := mcStoreBkt.DeleteNestedBucket(n.namespace)
4✔
821
                if err != nil {
4✔
822
                        return err
×
823
                }
×
824

825
                _, err = mcStoreBkt.CreateBucket(n.namespace)
4✔
826

4✔
827
                return err
4✔
828
        }, func() {})
4✔
829
}
830

831
// paymentFailure represents the presence of a payment failure. It may or may
832
// not include additional information about said failure.
833
type paymentFailure struct {
834
        info tlv.OptionalRecordT[tlv.TlvType0, paymentFailureInfo]
835
}
836

837
// newPaymentFailure constructs a new paymentFailure struct. If the source
838
// index is nil, then an empty paymentFailure is returned. This represents a
839
// failure with unknown details. Otherwise, the index and failure message are
840
// used to populate the info field of the paymentFailure.
841
func newPaymentFailure(sourceIdx *int,
842
        failureMsg lnwire.FailureMessage) *paymentFailure {
4✔
843

4✔
844
        if sourceIdx == nil {
4✔
UNCOV
845
                return &paymentFailure{}
×
UNCOV
846
        }
×
847

848
        info := paymentFailureInfo{
4✔
849
                sourceIdx: tlv.NewPrimitiveRecord[tlv.TlvType0](
4✔
850
                        uint8(*sourceIdx),
4✔
851
                ),
4✔
852
                msg: tlv.NewRecordT[tlv.TlvType1](failureMessage{failureMsg}),
4✔
853
        }
4✔
854

4✔
855
        return &paymentFailure{
4✔
856
                info: tlv.SomeRecordT(tlv.NewRecordT[tlv.TlvType0](info)),
4✔
857
        }
4✔
858
}
859

860
// Record returns a TLV record that can be used to encode/decode a
861
// paymentFailure to/from a TLV stream.
862
func (r *paymentFailure) Record() tlv.Record {
4✔
863
        recordSize := func() uint64 {
8✔
864
                var (
4✔
865
                        b   bytes.Buffer
4✔
866
                        buf [8]byte
4✔
867
                )
4✔
868
                if err := encodePaymentFailure(&b, r, &buf); err != nil {
4✔
869
                        panic(err)
×
870
                }
871

872
                return uint64(len(b.Bytes()))
4✔
873
        }
874

875
        return tlv.MakeDynamicRecord(
4✔
876
                0, r, recordSize, encodePaymentFailure, decodePaymentFailure,
4✔
877
        )
4✔
878
}
879

880
func encodePaymentFailure(w io.Writer, val interface{}, _ *[8]byte) error {
4✔
881
        if v, ok := val.(*paymentFailure); ok {
8✔
882
                var recordProducers []tlv.RecordProducer
4✔
883
                v.info.WhenSome(
4✔
884
                        func(r tlv.RecordT[tlv.TlvType0, paymentFailureInfo]) {
8✔
885
                                recordProducers = append(recordProducers, &r)
4✔
886
                        },
4✔
887
                )
888

889
                return lnwire.EncodeRecordsTo(
4✔
890
                        w, lnwire.ProduceRecordsSorted(recordProducers...),
4✔
891
                )
4✔
892
        }
893

894
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailure")
×
895
}
896

897
func decodePaymentFailure(r io.Reader, val interface{}, _ *[8]byte,
898
        l uint64) error {
4✔
899

4✔
900
        if v, ok := val.(*paymentFailure); ok {
8✔
901
                var h paymentFailure
4✔
902

4✔
903
                info := tlv.ZeroRecordT[tlv.TlvType0, paymentFailureInfo]()
4✔
904
                typeMap, err := lnwire.DecodeRecords(
4✔
905
                        r, lnwire.ProduceRecordsSorted(&info)...,
4✔
906
                )
4✔
907
                if err != nil {
4✔
908
                        return err
×
909
                }
×
910

911
                if _, ok := typeMap[h.info.TlvType()]; ok {
8✔
912
                        h.info = tlv.SomeRecordT(info)
4✔
913
                }
4✔
914

915
                *v = h
4✔
916

4✔
917
                return nil
4✔
918
        }
919

920
        return tlv.NewTypeForDecodingErr(val, "routing.paymentFailure", l, l)
×
921
}
922

923
// paymentFailureInfo holds additional information about a payment failure.
924
type paymentFailureInfo struct {
925
        sourceIdx tlv.RecordT[tlv.TlvType0, uint8]
926
        msg       tlv.RecordT[tlv.TlvType1, failureMessage]
927
}
928

929
// Record returns a TLV record that can be used to encode/decode a
930
// paymentFailureInfo to/from a TLV stream.
931
func (r *paymentFailureInfo) Record() tlv.Record {
4✔
932
        recordSize := func() uint64 {
8✔
933
                var (
4✔
934
                        b   bytes.Buffer
4✔
935
                        buf [8]byte
4✔
936
                )
4✔
937
                if err := encodePaymentFailureInfo(&b, r, &buf); err != nil {
4✔
938
                        panic(err)
×
939
                }
940

941
                return uint64(len(b.Bytes()))
4✔
942
        }
943

944
        return tlv.MakeDynamicRecord(
4✔
945
                0, r, recordSize, encodePaymentFailureInfo,
4✔
946
                decodePaymentFailureInfo,
4✔
947
        )
4✔
948
}
949

950
func encodePaymentFailureInfo(w io.Writer, val interface{}, _ *[8]byte) error {
4✔
951
        if v, ok := val.(*paymentFailureInfo); ok {
8✔
952
                return lnwire.EncodeRecordsTo(
4✔
953
                        w, lnwire.ProduceRecordsSorted(
4✔
954
                                &v.sourceIdx, &v.msg,
4✔
955
                        ),
4✔
956
                )
4✔
957
        }
4✔
958

959
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailureInfo")
×
960
}
961

962
func decodePaymentFailureInfo(r io.Reader, val interface{}, _ *[8]byte,
963
        l uint64) error {
4✔
964

4✔
965
        if v, ok := val.(*paymentFailureInfo); ok {
8✔
966
                var h paymentFailureInfo
4✔
967

4✔
968
                _, err := lnwire.DecodeRecords(
4✔
969
                        r,
4✔
970
                        lnwire.ProduceRecordsSorted(&h.sourceIdx, &h.msg)...,
4✔
971
                )
4✔
972
                if err != nil {
4✔
973
                        return err
×
974
                }
×
975

976
                *v = h
4✔
977

4✔
978
                return nil
4✔
979
        }
980

981
        return tlv.NewTypeForDecodingErr(
×
982
                val, "routing.paymentFailureInfo", l, l,
×
983
        )
×
984
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc