• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12033440129

26 Nov 2024 03:03PM UTC coverage: 48.738% (-10.3%) from 58.999%
12033440129

Pull #9309

github

yyforyongyu
gomod: update `btcd` for shutdown fix
Pull Request #9309: chainntnfs: fix `TestHistoricalConfDetailsTxIndex`

97664 of 200385 relevant lines covered (48.74%)

0.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.52
/routing/missioncontrol.go
1
package routing
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/build"
15
        "github.com/lightningnetwork/lnd/channeldb"
16
        "github.com/lightningnetwork/lnd/clock"
17
        "github.com/lightningnetwork/lnd/fn"
18
        "github.com/lightningnetwork/lnd/kvdb"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/routing/route"
21
        "github.com/lightningnetwork/lnd/tlv"
22
)
23

24
const (
25
        // DefaultPenaltyHalfLife is the default half-life duration. The
26
        // half-life duration defines after how much time a penalized node or
27
        // channel is back at 50% probability.
28
        DefaultPenaltyHalfLife = time.Hour
29

30
        // minSecondChanceInterval is the minimum time required between
31
        // second-chance failures.
32
        //
33
        // If nodes return a channel policy related failure, they may get a
34
        // second chance to forward the payment. It could be that the channel
35
        // policy that we are aware of is not up to date. This is especially
36
        // important in case of mobile apps that are mostly offline.
37
        //
38
        // However, we don't want to give nodes the option to endlessly return
39
        // new channel updates so that we are kept busy trying to route through
40
        // that node until the payment loop times out.
41
        //
42
        // Therefore we only grant a second chance to a node if the previous
43
        // second chance is sufficiently long ago. This is what
44
        // minSecondChanceInterval defines. If a second policy failure comes in
45
        // within that interval, we will apply a penalty.
46
        //
47
        // Second chances granted are tracked on the level of node pairs. This
48
        // means that if a node has multiple channels to the same peer, they
49
        // will only get a single second chance to route to that peer again.
50
        // Nodes forward non-strict, so it isn't necessary to apply a less
51
        // restrictive channel level tracking scheme here.
52
        minSecondChanceInterval = time.Minute
53

54
        // DefaultMaxMcHistory is the default maximum history size.
55
        DefaultMaxMcHistory = 1000
56

57
        // DefaultMcFlushInterval is the default interval we use to flush MC state
58
        // to the database.
59
        DefaultMcFlushInterval = time.Second
60

61
        // prevSuccessProbability is the assumed probability for node pairs that
62
        // successfully relayed the previous attempt.
63
        prevSuccessProbability = 0.95
64

65
        // DefaultAprioriWeight is the default a priori weight. See
66
        // MissionControlConfig for further explanation.
67
        DefaultAprioriWeight = 0.5
68

69
        // DefaultMinFailureRelaxInterval is the default minimum time that must
70
        // have passed since the previously recorded failure before the failure
71
        // amount may be raised.
72
        DefaultMinFailureRelaxInterval = time.Minute
73

74
        // DefaultFeeEstimationTimeout is the default value for
75
        // FeeEstimationTimeout. It defines the maximum duration that the
76
        // probing fee estimation is allowed to take.
77
        DefaultFeeEstimationTimeout = time.Minute
78

79
        // DefaultMissionControlNamespace is the name of the default mission
80
        // control name space. This is used as the sub-bucket key within the
81
        // top level DB bucket to store mission control results.
82
        DefaultMissionControlNamespace = "default"
83
)
84

85
var (
86
        // ErrInvalidMcHistory is returned if we get a negative mission control
87
        // history count.
88
        ErrInvalidMcHistory = errors.New("mission control history must be " +
89
                ">= 0")
90

91
        // ErrInvalidFailureInterval is returned if we get an invalid failure
92
        // interval.
93
        ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
94
)
95

96
// NodeResults contains previous results from a node to its peers.
97
type NodeResults map[route.Vertex]TimedPairResult
98

99
// mcConfig holds various config members that will be required by all
100
// MissionControl instances and will be the same regardless of namespace.
101
type mcConfig struct {
102
        // clock is a time source used by mission control.
103
        clock clock.Clock
104

105
        // selfNode is our pubkey.
106
        selfNode route.Vertex
107
}
108

109
// MissionControl contains state which summarizes the past attempts of HTLC
110
// routing by external callers when sending payments throughout the network. It
111
// acts as a shared memory during routing attempts with the goal to optimize the
112
// payment attempt success rate.
113
//
114
// Failed payment attempts are reported to mission control. These reports are
115
// used to track the time of the last node or channel level failure. The time
116
// since the last failure is used to estimate a success probability that is fed
117
// into the path finding process for subsequent payment attempts.
118
type MissionControl struct {
119
        cfg *mcConfig
120

121
        // state is the internal mission control state that is input for
122
        // probability estimation.
123
        state *missionControlState
124

125
        store *missionControlStore
126

127
        // estimator is the probability estimator that is used with the payment
128
        // results that mission control collects.
129
        estimator Estimator
130

131
        // onConfigUpdate is a function that is called whenever the
132
        // mission control state is updated.
133
        onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
134

135
        log btclog.Logger
136

137
        mu sync.Mutex
138
}
139

140
// MissionController manages MissionControl instances in various namespaces.
141
type MissionController struct {
142
        db           kvdb.Backend
143
        cfg          *mcConfig
144
        defaultMCCfg *MissionControlConfig
145

146
        mc map[string]*MissionControl
147
        mu sync.Mutex
148

149
        // TODO(roasbeef): further counters, if vertex continually unavailable,
150
        // add to another generation
151

152
        // TODO(roasbeef): also add favorable metrics for nodes
153
}
154

155
// GetNamespacedStore returns the MissionControl in the given namespace. If one
156
// does not yet exist, then it is initialised.
157
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
158
        error) {
1✔
159

1✔
160
        m.mu.Lock()
1✔
161
        defer m.mu.Unlock()
1✔
162

1✔
163
        if mc, ok := m.mc[ns]; ok {
2✔
164
                return mc, nil
1✔
165
        }
1✔
166

167
        return m.initMissionControl(ns)
×
168
}
169

170
// ListNamespaces returns a list of the namespaces that the MissionController
171
// is aware of.
172
func (m *MissionController) ListNamespaces() []string {
×
173
        m.mu.Lock()
×
174
        defer m.mu.Unlock()
×
175

×
176
        namespaces := make([]string, 0, len(m.mc))
×
177
        for ns := range m.mc {
×
178
                namespaces = append(namespaces, ns)
×
179
        }
×
180

181
        return namespaces
×
182
}
183

184
// MissionControlConfig defines parameters that control mission control
185
// behaviour.
186
type MissionControlConfig struct {
187
        // Estimator gives probability estimates for node pairs.
188
        Estimator Estimator
189

190
        // OnConfigUpdate is function that is called whenever the
191
        // mission control state is updated.
192
        OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
193

194
        // MaxMcHistory defines the maximum number of payment results that are
195
        // held on disk.
196
        MaxMcHistory int
197

198
        // McFlushInterval defines the ticker interval when we flush the
199
        // accumulated state to the DB.
200
        McFlushInterval time.Duration
201

202
        // MinFailureRelaxInterval is the minimum time that must have passed
203
        // since the previously recorded failure before the failure amount may
204
        // be raised.
205
        MinFailureRelaxInterval time.Duration
206
}
207

208
func (c *MissionControlConfig) validate() error {
1✔
209
        if c.MaxMcHistory < 0 {
1✔
210
                return ErrInvalidMcHistory
×
211
        }
×
212

213
        if c.MinFailureRelaxInterval < 0 {
1✔
214
                return ErrInvalidFailureInterval
×
215
        }
×
216

217
        return nil
1✔
218
}
219

220
// String returns a string representation of a mission control config.
221
func (c *MissionControlConfig) String() string {
1✔
222
        return fmt.Sprintf("maximum history: %v, minimum failure relax "+
1✔
223
                "interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
1✔
224
}
1✔
225

226
// TimedPairResult describes a timestamped pair result.
227
type TimedPairResult struct {
228
        // FailTime is the time of the last failure.
229
        FailTime time.Time
230

231
        // FailAmt is the amount of the last failure. This amount may be pushed
232
        // up if a later success is higher than the last failed amount.
233
        FailAmt lnwire.MilliSatoshi
234

235
        // SuccessTime is the time of the last success.
236
        SuccessTime time.Time
237

238
        // SuccessAmt is the highest amount that successfully forwarded. This
239
        // isn't necessarily the last success amount. The value of this field
240
        // may also be pushed down if a later failure is lower than the highest
241
        // success amount. Because of this, SuccessAmt may not match
242
        // SuccessTime.
243
        SuccessAmt lnwire.MilliSatoshi
244
}
245

246
// MissionControlSnapshot contains a snapshot of the current state of mission
247
// control.
248
type MissionControlSnapshot struct {
249
        // Pairs is a list of channels for which specific information is
250
        // logged.
251
        Pairs []MissionControlPairSnapshot
252
}
253

254
// MissionControlPairSnapshot contains a snapshot of the current node pair
255
// state in mission control.
256
type MissionControlPairSnapshot struct {
257
        // Pair is the node pair of which the state is described.
258
        Pair DirectedNodePair
259

260
        // TimedPairResult contains the data for this pair.
261
        TimedPairResult
262
}
263

264
// paymentResult is the information that becomes available when a payment
265
// attempt completes.
266
type paymentResult struct {
267
        id        uint64
268
        timeFwd   tlv.RecordT[tlv.TlvType0, uint64]
269
        timeReply tlv.RecordT[tlv.TlvType1, uint64]
270
        route     tlv.RecordT[tlv.TlvType2, mcRoute]
271

272
        // failure holds information related to the failure of a payment. The
273
        // presence of this record indicates a payment failure. The absence of
274
        // this record indicates a successful payment.
275
        failure tlv.OptionalRecordT[tlv.TlvType3, paymentFailure]
276
}
277

278
// newPaymentResult constructs a new paymentResult.
279
func newPaymentResult(id uint64, rt *mcRoute, timeFwd, timeReply time.Time,
280
        failure *paymentFailure) *paymentResult {
1✔
281

1✔
282
        result := &paymentResult{
1✔
283
                id: id,
1✔
284
                timeFwd: tlv.NewPrimitiveRecord[tlv.TlvType0](
1✔
285
                        uint64(timeFwd.UnixNano()),
1✔
286
                ),
1✔
287
                timeReply: tlv.NewPrimitiveRecord[tlv.TlvType1](
1✔
288
                        uint64(timeReply.UnixNano()),
1✔
289
                ),
1✔
290
                route: tlv.NewRecordT[tlv.TlvType2](*rt),
1✔
291
        }
1✔
292

1✔
293
        if failure != nil {
2✔
294
                result.failure = tlv.SomeRecordT(
1✔
295
                        tlv.NewRecordT[tlv.TlvType3](*failure),
1✔
296
                )
1✔
297
        }
1✔
298

299
        return result
1✔
300
}
301

302
// NewMissionController returns a new instance of MissionController.
303
func NewMissionController(db kvdb.Backend, self route.Vertex,
304
        cfg *MissionControlConfig) (*MissionController, error) {
1✔
305

1✔
306
        log.Debugf("Instantiating mission control with config: %v, %v", cfg,
1✔
307
                cfg.Estimator)
1✔
308

1✔
309
        if err := cfg.validate(); err != nil {
1✔
310
                return nil, err
×
311
        }
×
312

313
        mcCfg := &mcConfig{
1✔
314
                clock:    clock.NewDefaultClock(),
1✔
315
                selfNode: self,
1✔
316
        }
1✔
317

1✔
318
        mgr := &MissionController{
1✔
319
                db:           db,
1✔
320
                defaultMCCfg: cfg,
1✔
321
                cfg:          mcCfg,
1✔
322
                mc:           make(map[string]*MissionControl),
1✔
323
        }
1✔
324

1✔
325
        if err := mgr.loadMissionControls(); err != nil {
1✔
326
                return nil, err
×
327
        }
×
328

329
        for _, mc := range mgr.mc {
2✔
330
                if err := mc.init(); err != nil {
1✔
331
                        return nil, err
×
332
                }
×
333
        }
334

335
        return mgr, nil
1✔
336
}
337

338
// loadMissionControls initialises a MissionControl in the default namespace if
339
// one does not yet exist. It then initialises a MissionControl for all other
340
// namespaces found in the DB.
341
//
342
// NOTE: this should only be called once during MissionController construction.
343
func (m *MissionController) loadMissionControls() error {
1✔
344
        m.mu.Lock()
1✔
345
        defer m.mu.Unlock()
1✔
346

1✔
347
        // Always initialise the default namespace.
1✔
348
        _, err := m.initMissionControl(DefaultMissionControlNamespace)
1✔
349
        if err != nil {
1✔
350
                return err
×
351
        }
×
352

353
        namespaces := make(map[string]struct{})
1✔
354
        err = m.db.View(func(tx walletdb.ReadTx) error {
2✔
355
                mcStoreBkt := tx.ReadBucket(resultsKey)
1✔
356
                if mcStoreBkt == nil {
1✔
357
                        return fmt.Errorf("top level mission control bucket " +
×
358
                                "not found")
×
359
                }
×
360

361
                // Iterate through all the keys in the bucket and collect the
362
                // namespaces.
363
                return mcStoreBkt.ForEach(func(k, _ []byte) error {
2✔
364
                        // We've already initialised the default namespace so
1✔
365
                        // we can skip it.
1✔
366
                        if string(k) == DefaultMissionControlNamespace {
2✔
367
                                return nil
1✔
368
                        }
1✔
369

370
                        namespaces[string(k)] = struct{}{}
×
371

×
372
                        return nil
×
373
                })
374
        }, func() {})
1✔
375
        if err != nil {
1✔
376
                return err
×
377
        }
×
378

379
        // Now, iterate through all the namespaces and initialise them.
380
        for ns := range namespaces {
1✔
381
                _, err = m.initMissionControl(ns)
×
382
                if err != nil {
×
383
                        return err
×
384
                }
×
385
        }
386

387
        return nil
1✔
388
}
389

390
// initMissionControl creates a new MissionControl instance with the given
391
// namespace if one does not yet exist.
392
//
393
// NOTE: the MissionController's mutex must be held before calling this method.
394
func (m *MissionController) initMissionControl(namespace string) (
395
        *MissionControl, error) {
1✔
396

1✔
397
        // If a mission control with this namespace has already been initialised
1✔
398
        // then there is nothing left to do.
1✔
399
        if mc, ok := m.mc[namespace]; ok {
1✔
400
                return mc, nil
×
401
        }
×
402

403
        cfg := m.defaultMCCfg
1✔
404

1✔
405
        store, err := newMissionControlStore(
1✔
406
                newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
1✔
407
                cfg.McFlushInterval,
1✔
408
        )
1✔
409
        if err != nil {
1✔
410
                return nil, err
×
411
        }
×
412

413
        mc := &MissionControl{
1✔
414
                cfg:       m.cfg,
1✔
415
                state:     newMissionControlState(cfg.MinFailureRelaxInterval),
1✔
416
                store:     store,
1✔
417
                estimator: cfg.Estimator,
1✔
418
                log: build.NewPrefixLog(
1✔
419
                        fmt.Sprintf("[%s]:", namespace), log,
1✔
420
                ),
1✔
421
                onConfigUpdate: cfg.OnConfigUpdate,
1✔
422
        }
1✔
423

1✔
424
        m.mc[namespace] = mc
1✔
425

1✔
426
        return mc, nil
1✔
427
}
428

429
// RunStoreTickers runs the mission controller store's tickers.
430
func (m *MissionController) RunStoreTickers() {
1✔
431
        m.mu.Lock()
1✔
432
        defer m.mu.Unlock()
1✔
433

1✔
434
        for _, mc := range m.mc {
2✔
435
                mc.store.run()
1✔
436
        }
1✔
437
}
438

439
// StopStoreTickers stops the mission control store's tickers.
440
func (m *MissionController) StopStoreTickers() {
1✔
441
        log.Debug("Stopping mission control store ticker")
1✔
442
        defer log.Debug("Mission control store ticker stopped")
1✔
443

1✔
444
        m.mu.Lock()
1✔
445
        defer m.mu.Unlock()
1✔
446

1✔
447
        for _, mc := range m.mc {
2✔
448
                mc.store.stop()
1✔
449
        }
1✔
450
}
451

452
// init initializes mission control with historical data.
453
func (m *MissionControl) init() error {
1✔
454
        m.log.Debugf("Mission control state reconstruction started")
1✔
455

1✔
456
        m.mu.Lock()
1✔
457
        defer m.mu.Unlock()
1✔
458

1✔
459
        start := time.Now()
1✔
460

1✔
461
        results, err := m.store.fetchAll()
1✔
462
        if err != nil {
1✔
463
                return err
×
464
        }
×
465

466
        for _, result := range results {
2✔
467
                _ = m.applyPaymentResult(result)
1✔
468
        }
1✔
469

470
        m.log.Debugf("Mission control state reconstruction finished: "+
1✔
471
                "n=%v, time=%v", len(results), time.Since(start))
1✔
472

1✔
473
        return nil
1✔
474
}
475

476
// GetConfig returns the config that mission control is currently configured
477
// with. All fields are copied by value, so we do not need to worry about
478
// mutation.
479
func (m *MissionControl) GetConfig() *MissionControlConfig {
1✔
480
        m.mu.Lock()
1✔
481
        defer m.mu.Unlock()
1✔
482

1✔
483
        return &MissionControlConfig{
1✔
484
                Estimator:               m.estimator,
1✔
485
                MaxMcHistory:            m.store.maxRecords,
1✔
486
                McFlushInterval:         m.store.flushInterval,
1✔
487
                MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
1✔
488
        }
1✔
489
}
1✔
490

491
// SetConfig validates the config provided and updates mission control's config
492
// if it is valid.
493
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
1✔
494
        if cfg == nil {
1✔
495
                return errors.New("nil mission control config")
×
496
        }
×
497

498
        if err := cfg.validate(); err != nil {
1✔
499
                return err
×
500
        }
×
501

502
        m.mu.Lock()
1✔
503
        defer m.mu.Unlock()
1✔
504

1✔
505
        m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
1✔
506
                cfg.Estimator)
1✔
507

1✔
508
        m.store.maxRecords = cfg.MaxMcHistory
1✔
509
        m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
1✔
510
        m.estimator = cfg.Estimator
1✔
511

1✔
512
        // Execute the callback function if it is set.
1✔
513
        m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
2✔
514
                f(cfg)
1✔
515
        })
1✔
516

517
        return nil
1✔
518
}
519

520
// ResetHistory resets the history of MissionControl returning it to a state as
521
// if no payment attempts have been made.
522
func (m *MissionControl) ResetHistory() error {
1✔
523
        m.mu.Lock()
1✔
524
        defer m.mu.Unlock()
1✔
525

1✔
526
        if err := m.store.clear(); err != nil {
1✔
527
                return err
×
528
        }
×
529

530
        m.state.resetHistory()
1✔
531

1✔
532
        m.log.Debugf("Mission control history cleared")
1✔
533

1✔
534
        return nil
1✔
535
}
536

537
// GetProbability is expected to return the success probability of a payment
538
// from fromNode along edge.
539
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
540
        amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
1✔
541

1✔
542
        m.mu.Lock()
1✔
543
        defer m.mu.Unlock()
1✔
544

1✔
545
        now := m.cfg.clock.Now()
1✔
546
        results, _ := m.state.getLastPairResult(fromNode)
1✔
547

1✔
548
        // Use a distinct probability estimation function for local channels.
1✔
549
        if fromNode == m.cfg.selfNode {
2✔
550
                return m.estimator.LocalPairProbability(now, results, toNode)
1✔
551
        }
1✔
552

553
        return m.estimator.PairProbability(
1✔
554
                now, results, toNode, amt, capacity,
1✔
555
        )
1✔
556
}
557

558
// GetHistorySnapshot takes a snapshot from the current mission control state
559
// and actual probability estimates.
560
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
×
561
        m.mu.Lock()
×
562
        defer m.mu.Unlock()
×
563

×
564
        m.log.Debugf("Requesting history snapshot from mission control")
×
565

×
566
        return m.state.getSnapshot()
×
567
}
×
568

569
// ImportHistory imports the set of mission control results provided to our
570
// in-memory state. These results are not persisted, so will not survive
571
// restarts.
572
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
573
        force bool) error {
1✔
574

1✔
575
        if history == nil {
1✔
576
                return errors.New("cannot import nil history")
×
577
        }
×
578

579
        m.mu.Lock()
1✔
580
        defer m.mu.Unlock()
1✔
581

1✔
582
        m.log.Infof("Importing history snapshot with %v pairs to mission "+
1✔
583
                "control", len(history.Pairs))
1✔
584

1✔
585
        imported := m.state.importSnapshot(history, force)
1✔
586

1✔
587
        m.log.Infof("Imported %v results to mission control", imported)
1✔
588

1✔
589
        return nil
1✔
590
}
591

592
// GetPairHistorySnapshot returns the stored history for a given node pair.
593
func (m *MissionControl) GetPairHistorySnapshot(
594
        fromNode, toNode route.Vertex) TimedPairResult {
1✔
595

1✔
596
        m.mu.Lock()
1✔
597
        defer m.mu.Unlock()
1✔
598

1✔
599
        results, ok := m.state.getLastPairResult(fromNode)
1✔
600
        if !ok {
2✔
601
                return TimedPairResult{}
1✔
602
        }
1✔
603

604
        result, ok := results[toNode]
1✔
605
        if !ok {
2✔
606
                return TimedPairResult{}
1✔
607
        }
1✔
608

609
        return result
1✔
610
}
611

612
// ReportPaymentFail reports a failed payment to mission control as input for
613
// future probability estimates. The failureSourceIdx argument indicates the
614
// failure source. If it is nil, the failure source is unknown. This function
615
// returns a reason if this failure is a final failure. In that case no further
616
// payment attempts need to be made.
617
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
618
        failureSourceIdx *int, failure lnwire.FailureMessage) (
619
        *channeldb.FailureReason, error) {
1✔
620

1✔
621
        timestamp := m.cfg.clock.Now()
1✔
622

1✔
623
        result := newPaymentResult(
1✔
624
                paymentID, extractMCRoute(rt), timestamp, timestamp,
1✔
625
                newPaymentFailure(failureSourceIdx, failure),
1✔
626
        )
1✔
627

1✔
628
        return m.processPaymentResult(result)
1✔
629
}
1✔
630

631
// ReportPaymentSuccess reports a successful payment to mission control as input
632
// for future probability estimates.
633
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
634
        rt *route.Route) error {
1✔
635

1✔
636
        timestamp := m.cfg.clock.Now()
1✔
637

1✔
638
        result := newPaymentResult(
1✔
639
                paymentID, extractMCRoute(rt), timestamp, timestamp, nil,
1✔
640
        )
1✔
641

1✔
642
        _, err := m.processPaymentResult(result)
1✔
643

1✔
644
        return err
1✔
645
}
1✔
646

647
// processPaymentResult stores a payment result in the mission control store and
648
// updates mission control's in-memory state.
649
func (m *MissionControl) processPaymentResult(result *paymentResult) (
650
        *channeldb.FailureReason, error) {
1✔
651

1✔
652
        // Store complete result in database.
1✔
653
        m.store.AddResult(result)
1✔
654

1✔
655
        m.mu.Lock()
1✔
656
        defer m.mu.Unlock()
1✔
657

1✔
658
        // Apply result to update mission control state.
1✔
659
        reason := m.applyPaymentResult(result)
1✔
660

1✔
661
        return reason, nil
1✔
662
}
1✔
663

664
// applyPaymentResult applies a payment result as input for future probability
665
// estimates. It returns a bool indicating whether this error is a final error
666
// and no further payment attempts need to be made.
667
func (m *MissionControl) applyPaymentResult(
668
        result *paymentResult) *channeldb.FailureReason {
1✔
669

1✔
670
        // Interpret result.
1✔
671
        i := interpretResult(&result.route.Val, result.failure.ValOpt())
1✔
672

1✔
673
        if i.policyFailure != nil {
2✔
674
                if m.state.requestSecondChance(
1✔
675
                        time.Unix(0, int64(result.timeReply.Val)),
1✔
676
                        i.policyFailure.From, i.policyFailure.To,
1✔
677
                ) {
2✔
678
                        return nil
1✔
679
                }
1✔
680
        }
681

682
        // If there is a node-level failure, record a failure for every tried
683
        // connection of that node. A node-level failure can be considered as a
684
        // failure that would have occurred with any of the node's channels.
685
        //
686
        // Ideally we'd also record the failure for the untried connections of
687
        // the node. Unfortunately this would require access to the graph and
688
        // adding this dependency and db calls does not outweigh the benefits.
689
        //
690
        // Untried connections will fall back to the node probability. After the
691
        // call to setAllPairResult below, the node probability will be equal to
692
        // the probability of the tried channels except that the a priori
693
        // probability is mixed in too. This effect is controlled by the
694
        // aprioriWeight parameter. If that parameter isn't set to an extreme
695
        // and there are a few known connections, there shouldn't be much of a
696
        // difference. The largest difference occurs when aprioriWeight is 1. In
697
        // that case, a node-level failure would not be applied to untried
698
        // channels.
699
        if i.nodeFailure != nil {
1✔
700
                m.log.Debugf("Reporting node failure to Mission Control: "+
×
701
                        "node=%v", *i.nodeFailure)
×
702

×
703
                m.state.setAllFail(
×
704
                        *i.nodeFailure,
×
705
                        time.Unix(0, int64(result.timeReply.Val)),
×
706
                )
×
707
        }
×
708

709
        for pair, pairResult := range i.pairResults {
2✔
710
                pairResult := pairResult
1✔
711

1✔
712
                if pairResult.success {
2✔
713
                        m.log.Debugf("Reporting pair success to Mission "+
1✔
714
                                "Control: pair=%v, amt=%v",
1✔
715
                                pair, pairResult.amt)
1✔
716
                } else {
2✔
717
                        m.log.Debugf("Reporting pair failure to Mission "+
1✔
718
                                "Control: pair=%v, amt=%v",
1✔
719
                                pair, pairResult.amt)
1✔
720
                }
1✔
721

722
                m.state.setLastPairResult(
1✔
723
                        pair.From, pair.To,
1✔
724
                        time.Unix(0, int64(result.timeReply.Val)), &pairResult,
1✔
725
                        false,
1✔
726
                )
1✔
727
        }
728

729
        return i.finalFailureReason
1✔
730
}
731

732
// namespacedDB is an implementation of the missionControlDB that gives a user
733
// of the interface access to a namespaced bucket within the top level mission
734
// control bucket.
735
type namespacedDB struct {
736
        topLevelBucketKey []byte
737
        namespace         []byte
738
        db                kvdb.Backend
739
}
740

741
// A compile-time check to ensure that namespacedDB implements missionControlDB.
742
var _ missionControlDB = (*namespacedDB)(nil)
743

744
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
745
// default namespace.
746
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
×
747
        return newNamespacedDB(db, DefaultMissionControlNamespace)
×
748
}
×
749

750
// newNamespacedDB creates a new instance of missionControlDB where the DB will
751
// have access to a namespaced bucket within the top level mission control
752
// bucket.
753
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
1✔
754
        return &namespacedDB{
1✔
755
                db:                db,
1✔
756
                namespace:         []byte(namespace),
1✔
757
                topLevelBucketKey: resultsKey,
1✔
758
        }
1✔
759
}
1✔
760

761
// update can be used to perform reads and writes on the given bucket.
762
//
763
// NOTE: this is part of the missionControlDB interface.
764
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
765
        reset func()) error {
1✔
766

1✔
767
        return n.db.Update(func(tx kvdb.RwTx) error {
2✔
768
                mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
1✔
769
                if err != nil {
1✔
770
                        return fmt.Errorf("cannot create top level mission "+
×
771
                                "control bucket: %w", err)
×
772
                }
×
773

774
                namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
1✔
775
                        n.namespace,
1✔
776
                )
1✔
777
                if err != nil {
1✔
778
                        return fmt.Errorf("cannot create namespaced bucket "+
×
779
                                "(%s) in mission control store: %w",
×
780
                                n.namespace, err)
×
781
                }
×
782

783
                return f(namespacedBkt)
1✔
784
        }, reset)
785
}
786

787
// view can be used to perform reads on the given bucket.
788
//
789
// NOTE: this is part of the missionControlDB interface.
790
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
791
        reset func()) error {
1✔
792

1✔
793
        return n.db.View(func(tx kvdb.RTx) error {
2✔
794
                mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
1✔
795
                if mcStoreBkt == nil {
1✔
796
                        return fmt.Errorf("top level mission control bucket " +
×
797
                                "not found")
×
798
                }
×
799

800
                namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
1✔
801
                if namespacedBkt == nil {
1✔
802
                        return fmt.Errorf("namespaced bucket (%s) not found "+
×
803
                                "in mission control store", n.namespace)
×
804
                }
×
805

806
                return f(namespacedBkt)
1✔
807
        }, reset)
808
}
809

810
// purge will delete all the contents in the namespace.
811
//
812
// NOTE: this is part of the missionControlDB interface.
813
func (n *namespacedDB) purge() error {
1✔
814
        return n.db.Update(func(tx kvdb.RwTx) error {
2✔
815
                mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
1✔
816
                if mcStoreBkt == nil {
1✔
817
                        return nil
×
818
                }
×
819

820
                err := mcStoreBkt.DeleteNestedBucket(n.namespace)
1✔
821
                if err != nil {
1✔
822
                        return err
×
823
                }
×
824

825
                _, err = mcStoreBkt.CreateBucket(n.namespace)
1✔
826

1✔
827
                return err
1✔
828
        }, func() {})
1✔
829
}
830

831
// paymentFailure represents the presence of a payment failure. It may or may
832
// not include additional information about said failure.
833
type paymentFailure struct {
834
        info tlv.OptionalRecordT[tlv.TlvType0, paymentFailureInfo]
835
}
836

837
// newPaymentFailure constructs a new paymentFailure struct. If the source
838
// index is nil, then an empty paymentFailure is returned. This represents a
839
// failure with unknown details. Otherwise, the index and failure message are
840
// used to populate the info field of the paymentFailure.
841
func newPaymentFailure(sourceIdx *int,
842
        failureMsg lnwire.FailureMessage) *paymentFailure {
1✔
843

1✔
844
        if sourceIdx == nil {
1✔
845
                return &paymentFailure{}
×
846
        }
×
847

848
        info := paymentFailureInfo{
1✔
849
                sourceIdx: tlv.NewPrimitiveRecord[tlv.TlvType0](
1✔
850
                        uint8(*sourceIdx),
1✔
851
                ),
1✔
852
                msg: tlv.NewRecordT[tlv.TlvType1](failureMessage{failureMsg}),
1✔
853
        }
1✔
854

1✔
855
        return &paymentFailure{
1✔
856
                info: tlv.SomeRecordT(tlv.NewRecordT[tlv.TlvType0](info)),
1✔
857
        }
1✔
858
}
859

860
// Record returns a TLV record that can be used to encode/decode a
861
// paymentFailure to/from a TLV stream.
862
func (r *paymentFailure) Record() tlv.Record {
1✔
863
        recordSize := func() uint64 {
2✔
864
                var (
1✔
865
                        b   bytes.Buffer
1✔
866
                        buf [8]byte
1✔
867
                )
1✔
868
                if err := encodePaymentFailure(&b, r, &buf); err != nil {
1✔
869
                        panic(err)
×
870
                }
871

872
                return uint64(len(b.Bytes()))
1✔
873
        }
874

875
        return tlv.MakeDynamicRecord(
1✔
876
                0, r, recordSize, encodePaymentFailure, decodePaymentFailure,
1✔
877
        )
1✔
878
}
879

880
func encodePaymentFailure(w io.Writer, val interface{}, _ *[8]byte) error {
1✔
881
        if v, ok := val.(*paymentFailure); ok {
2✔
882
                var recordProducers []tlv.RecordProducer
1✔
883
                v.info.WhenSome(
1✔
884
                        func(r tlv.RecordT[tlv.TlvType0, paymentFailureInfo]) {
2✔
885
                                recordProducers = append(recordProducers, &r)
1✔
886
                        },
1✔
887
                )
888

889
                return lnwire.EncodeRecordsTo(
1✔
890
                        w, lnwire.ProduceRecordsSorted(recordProducers...),
1✔
891
                )
1✔
892
        }
893

894
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailure")
×
895
}
896

897
func decodePaymentFailure(r io.Reader, val interface{}, _ *[8]byte,
898
        l uint64) error {
1✔
899

1✔
900
        if v, ok := val.(*paymentFailure); ok {
2✔
901
                var h paymentFailure
1✔
902

1✔
903
                info := tlv.ZeroRecordT[tlv.TlvType0, paymentFailureInfo]()
1✔
904
                typeMap, err := lnwire.DecodeRecords(
1✔
905
                        r, lnwire.ProduceRecordsSorted(&info)...,
1✔
906
                )
1✔
907
                if err != nil {
1✔
908
                        return err
×
909
                }
×
910

911
                if _, ok := typeMap[h.info.TlvType()]; ok {
2✔
912
                        h.info = tlv.SomeRecordT(info)
1✔
913
                }
1✔
914

915
                *v = h
1✔
916

1✔
917
                return nil
1✔
918
        }
919

920
        return tlv.NewTypeForDecodingErr(val, "routing.paymentFailure", l, l)
×
921
}
922

923
// paymentFailureInfo holds additional information about a payment failure.
924
type paymentFailureInfo struct {
925
        sourceIdx tlv.RecordT[tlv.TlvType0, uint8]
926
        msg       tlv.RecordT[tlv.TlvType1, failureMessage]
927
}
928

929
// Record returns a TLV record that can be used to encode/decode a
930
// paymentFailureInfo to/from a TLV stream.
931
func (r *paymentFailureInfo) Record() tlv.Record {
1✔
932
        recordSize := func() uint64 {
2✔
933
                var (
1✔
934
                        b   bytes.Buffer
1✔
935
                        buf [8]byte
1✔
936
                )
1✔
937
                if err := encodePaymentFailureInfo(&b, r, &buf); err != nil {
1✔
938
                        panic(err)
×
939
                }
940

941
                return uint64(len(b.Bytes()))
1✔
942
        }
943

944
        return tlv.MakeDynamicRecord(
1✔
945
                0, r, recordSize, encodePaymentFailureInfo,
1✔
946
                decodePaymentFailureInfo,
1✔
947
        )
1✔
948
}
949

950
func encodePaymentFailureInfo(w io.Writer, val interface{}, _ *[8]byte) error {
1✔
951
        if v, ok := val.(*paymentFailureInfo); ok {
2✔
952
                return lnwire.EncodeRecordsTo(
1✔
953
                        w, lnwire.ProduceRecordsSorted(
1✔
954
                                &v.sourceIdx, &v.msg,
1✔
955
                        ),
1✔
956
                )
1✔
957
        }
1✔
958

959
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailureInfo")
×
960
}
961

962
func decodePaymentFailureInfo(r io.Reader, val interface{}, _ *[8]byte,
963
        l uint64) error {
1✔
964

1✔
965
        if v, ok := val.(*paymentFailureInfo); ok {
2✔
966
                var h paymentFailureInfo
1✔
967

1✔
968
                _, err := lnwire.DecodeRecords(
1✔
969
                        r,
1✔
970
                        lnwire.ProduceRecordsSorted(&h.sourceIdx, &h.msg)...,
1✔
971
                )
1✔
972
                if err != nil {
1✔
973
                        return err
×
974
                }
×
975

976
                *v = h
1✔
977

1✔
978
                return nil
1✔
979
        }
980

981
        return tlv.NewTypeForDecodingErr(
×
982
                val, "routing.paymentFailureInfo", l, l,
×
983
        )
×
984
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc