• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13035292482

29 Jan 2025 03:59PM UTC coverage: 49.3% (-9.5%) from 58.777%
13035292482

Pull #9456

github

mohamedawnallah
docs: update release-notes-0.19.0.md

In this commit, we warn users about the removal
of RPCs `SendToRoute`, `SendToRouteSync`, `SendPayment`,
and `SendPaymentSync` in the next release 0.20.
Pull Request #9456: lnrpc+docs: deprecate warning `SendToRoute`, `SendToRouteSync`, `SendPayment`, and `SendPaymentSync` in Release 0.19

100634 of 204126 relevant lines covered (49.3%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.12
/routing/missioncontrol.go
1
package routing
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/channeldb"
15
        "github.com/lightningnetwork/lnd/clock"
16
        "github.com/lightningnetwork/lnd/fn/v2"
17
        "github.com/lightningnetwork/lnd/kvdb"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/routing/route"
20
        "github.com/lightningnetwork/lnd/tlv"
21
)
22

23
const (
24
        // DefaultPenaltyHalfLife is the default half-life duration. The
25
        // half-life duration defines after how much time a penalized node or
26
        // channel is back at 50% probability.
27
        DefaultPenaltyHalfLife = time.Hour
28

29
        // minSecondChanceInterval is the minimum time required between
30
        // second-chance failures.
31
        //
32
        // If nodes return a channel policy related failure, they may get a
33
        // second chance to forward the payment. It could be that the channel
34
        // policy that we are aware of is not up to date. This is especially
35
        // important in case of mobile apps that are mostly offline.
36
        //
37
        // However, we don't want to give nodes the option to endlessly return
38
        // new channel updates so that we are kept busy trying to route through
39
        // that node until the payment loop times out.
40
        //
41
        // Therefore we only grant a second chance to a node if the previous
42
        // second chance is sufficiently long ago. This is what
43
        // minSecondChanceInterval defines. If a second policy failure comes in
44
        // within that interval, we will apply a penalty.
45
        //
46
        // Second chances granted are tracked on the level of node pairs. This
47
        // means that if a node has multiple channels to the same peer, they
48
        // will only get a single second chance to route to that peer again.
49
        // Nodes forward non-strict, so it isn't necessary to apply a less
50
        // restrictive channel level tracking scheme here.
51
        minSecondChanceInterval = time.Minute
52

53
        // DefaultMaxMcHistory is the default maximum history size.
54
        DefaultMaxMcHistory = 1000
55

56
        // DefaultMcFlushInterval is the default interval we use to flush MC state
57
        // to the database.
58
        DefaultMcFlushInterval = time.Second
59

60
        // prevSuccessProbability is the assumed probability for node pairs that
61
        // successfully relayed the previous attempt.
62
        prevSuccessProbability = 0.95
63

64
        // DefaultAprioriWeight is the default a priori weight. See
65
        // MissionControlConfig for further explanation.
66
        DefaultAprioriWeight = 0.5
67

68
        // DefaultMinFailureRelaxInterval is the default minimum time that must
69
        // have passed since the previously recorded failure before the failure
70
        // amount may be raised.
71
        DefaultMinFailureRelaxInterval = time.Minute
72

73
        // DefaultFeeEstimationTimeout is the default value for
74
        // FeeEstimationTimeout. It defines the maximum duration that the
75
        // probing fee estimation is allowed to take.
76
        DefaultFeeEstimationTimeout = time.Minute
77

78
        // DefaultMissionControlNamespace is the name of the default mission
79
        // control name space. This is used as the sub-bucket key within the
80
        // top level DB bucket to store mission control results.
81
        DefaultMissionControlNamespace = "default"
82
)
83

84
var (
85
        // ErrInvalidMcHistory is returned if we get a negative mission control
86
        // history count.
87
        ErrInvalidMcHistory = errors.New("mission control history must be " +
88
                ">= 0")
89

90
        // ErrInvalidFailureInterval is returned if we get an invalid failure
91
        // interval.
92
        ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
93
)
94

95
// NodeResults contains previous results from a node to its peers.
96
type NodeResults map[route.Vertex]TimedPairResult
97

98
// mcConfig holds various config members that will be required by all
99
// MissionControl instances and will be the same regardless of namespace.
100
type mcConfig struct {
101
        // clock is a time source used by mission control.
102
        clock clock.Clock
103

104
        // selfNode is our pubkey.
105
        selfNode route.Vertex
106
}
107

108
// MissionControl contains state which summarizes the past attempts of HTLC
109
// routing by external callers when sending payments throughout the network. It
110
// acts as a shared memory during routing attempts with the goal to optimize the
111
// payment attempt success rate.
112
//
113
// Failed payment attempts are reported to mission control. These reports are
114
// used to track the time of the last node or channel level failure. The time
115
// since the last failure is used to estimate a success probability that is fed
116
// into the path finding process for subsequent payment attempts.
117
type MissionControl struct {
118
        cfg *mcConfig
119

120
        // state is the internal mission control state that is input for
121
        // probability estimation.
122
        state *missionControlState
123

124
        store *missionControlStore
125

126
        // estimator is the probability estimator that is used with the payment
127
        // results that mission control collects.
128
        estimator Estimator
129

130
        // onConfigUpdate is a function that is called whenever the
131
        // mission control state is updated.
132
        onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
133

134
        log btclog.Logger
135

136
        mu sync.Mutex
137
}
138

139
// MissionController manages MissionControl instances in various namespaces.
140
type MissionController struct {
141
        db           kvdb.Backend
142
        cfg          *mcConfig
143
        defaultMCCfg *MissionControlConfig
144

145
        mc map[string]*MissionControl
146
        mu sync.Mutex
147

148
        // TODO(roasbeef): further counters, if vertex continually unavailable,
149
        // add to another generation
150

151
        // TODO(roasbeef): also add favorable metrics for nodes
152
}
153

154
// GetNamespacedStore returns the MissionControl in the given namespace. If one
155
// does not yet exist, then it is initialised.
156
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
157
        error) {
3✔
158

3✔
159
        m.mu.Lock()
3✔
160
        defer m.mu.Unlock()
3✔
161

3✔
162
        if mc, ok := m.mc[ns]; ok {
6✔
163
                return mc, nil
3✔
164
        }
3✔
165

166
        return m.initMissionControl(ns)
×
167
}
168

169
// ListNamespaces returns a list of the namespaces that the MissionController
170
// is aware of.
171
func (m *MissionController) ListNamespaces() []string {
×
172
        m.mu.Lock()
×
173
        defer m.mu.Unlock()
×
174

×
175
        namespaces := make([]string, 0, len(m.mc))
×
176
        for ns := range m.mc {
×
177
                namespaces = append(namespaces, ns)
×
178
        }
×
179

180
        return namespaces
×
181
}
182

183
// MissionControlConfig defines parameters that control mission control
184
// behaviour.
185
type MissionControlConfig struct {
186
        // Estimator gives probability estimates for node pairs.
187
        Estimator Estimator
188

189
        // OnConfigUpdate is function that is called whenever the
190
        // mission control state is updated.
191
        OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
192

193
        // MaxMcHistory defines the maximum number of payment results that are
194
        // held on disk.
195
        MaxMcHistory int
196

197
        // McFlushInterval defines the ticker interval when we flush the
198
        // accumulated state to the DB.
199
        McFlushInterval time.Duration
200

201
        // MinFailureRelaxInterval is the minimum time that must have passed
202
        // since the previously recorded failure before the failure amount may
203
        // be raised.
204
        MinFailureRelaxInterval time.Duration
205
}
206

207
func (c *MissionControlConfig) validate() error {
3✔
208
        if c.MaxMcHistory < 0 {
3✔
209
                return ErrInvalidMcHistory
×
210
        }
×
211

212
        if c.MinFailureRelaxInterval < 0 {
3✔
213
                return ErrInvalidFailureInterval
×
214
        }
×
215

216
        return nil
3✔
217
}
218

219
// String returns a string representation of a mission control config.
220
func (c *MissionControlConfig) String() string {
3✔
221
        return fmt.Sprintf("maximum history: %v, minimum failure relax "+
3✔
222
                "interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
3✔
223
}
3✔
224

225
// TimedPairResult describes a timestamped pair result.
226
type TimedPairResult struct {
227
        // FailTime is the time of the last failure.
228
        FailTime time.Time
229

230
        // FailAmt is the amount of the last failure. This amount may be pushed
231
        // up if a later success is higher than the last failed amount.
232
        FailAmt lnwire.MilliSatoshi
233

234
        // SuccessTime is the time of the last success.
235
        SuccessTime time.Time
236

237
        // SuccessAmt is the highest amount that successfully forwarded. This
238
        // isn't necessarily the last success amount. The value of this field
239
        // may also be pushed down if a later failure is lower than the highest
240
        // success amount. Because of this, SuccessAmt may not match
241
        // SuccessTime.
242
        SuccessAmt lnwire.MilliSatoshi
243
}
244

245
// MissionControlSnapshot contains a snapshot of the current state of mission
246
// control.
247
type MissionControlSnapshot struct {
248
        // Pairs is a list of channels for which specific information is
249
        // logged.
250
        Pairs []MissionControlPairSnapshot
251
}
252

253
// MissionControlPairSnapshot contains a snapshot of the current node pair
254
// state in mission control.
255
type MissionControlPairSnapshot struct {
256
        // Pair is the node pair of which the state is described.
257
        Pair DirectedNodePair
258

259
        // TimedPairResult contains the data for this pair.
260
        TimedPairResult
261
}
262

263
// paymentResult is the information that becomes available when a payment
264
// attempt completes.
265
type paymentResult struct {
266
        id        uint64
267
        timeFwd   tlv.RecordT[tlv.TlvType0, uint64]
268
        timeReply tlv.RecordT[tlv.TlvType1, uint64]
269
        route     tlv.RecordT[tlv.TlvType2, mcRoute]
270

271
        // failure holds information related to the failure of a payment. The
272
        // presence of this record indicates a payment failure. The absence of
273
        // this record indicates a successful payment.
274
        failure tlv.OptionalRecordT[tlv.TlvType3, paymentFailure]
275
}
276

277
// newPaymentResult constructs a new paymentResult.
278
func newPaymentResult(id uint64, rt *mcRoute, timeFwd, timeReply time.Time,
279
        failure *paymentFailure) *paymentResult {
3✔
280

3✔
281
        result := &paymentResult{
3✔
282
                id: id,
3✔
283
                timeFwd: tlv.NewPrimitiveRecord[tlv.TlvType0](
3✔
284
                        uint64(timeFwd.UnixNano()),
3✔
285
                ),
3✔
286
                timeReply: tlv.NewPrimitiveRecord[tlv.TlvType1](
3✔
287
                        uint64(timeReply.UnixNano()),
3✔
288
                ),
3✔
289
                route: tlv.NewRecordT[tlv.TlvType2](*rt),
3✔
290
        }
3✔
291

3✔
292
        if failure != nil {
6✔
293
                result.failure = tlv.SomeRecordT(
3✔
294
                        tlv.NewRecordT[tlv.TlvType3](*failure),
3✔
295
                )
3✔
296
        }
3✔
297

298
        return result
3✔
299
}
300

301
// NewMissionController returns a new instance of MissionController.
302
func NewMissionController(db kvdb.Backend, self route.Vertex,
303
        cfg *MissionControlConfig) (*MissionController, error) {
3✔
304

3✔
305
        log.Debugf("Instantiating mission control with config: %v, %v", cfg,
3✔
306
                cfg.Estimator)
3✔
307

3✔
308
        if err := cfg.validate(); err != nil {
3✔
309
                return nil, err
×
310
        }
×
311

312
        mcCfg := &mcConfig{
3✔
313
                clock:    clock.NewDefaultClock(),
3✔
314
                selfNode: self,
3✔
315
        }
3✔
316

3✔
317
        mgr := &MissionController{
3✔
318
                db:           db,
3✔
319
                defaultMCCfg: cfg,
3✔
320
                cfg:          mcCfg,
3✔
321
                mc:           make(map[string]*MissionControl),
3✔
322
        }
3✔
323

3✔
324
        if err := mgr.loadMissionControls(); err != nil {
3✔
325
                return nil, err
×
326
        }
×
327

328
        for _, mc := range mgr.mc {
6✔
329
                if err := mc.init(); err != nil {
3✔
330
                        return nil, err
×
331
                }
×
332
        }
333

334
        return mgr, nil
3✔
335
}
336

337
// loadMissionControls initialises a MissionControl in the default namespace if
338
// one does not yet exist. It then initialises a MissionControl for all other
339
// namespaces found in the DB.
340
//
341
// NOTE: this should only be called once during MissionController construction.
342
func (m *MissionController) loadMissionControls() error {
3✔
343
        m.mu.Lock()
3✔
344
        defer m.mu.Unlock()
3✔
345

3✔
346
        // Always initialise the default namespace.
3✔
347
        _, err := m.initMissionControl(DefaultMissionControlNamespace)
3✔
348
        if err != nil {
3✔
349
                return err
×
350
        }
×
351

352
        namespaces := make(map[string]struct{})
3✔
353
        err = m.db.View(func(tx walletdb.ReadTx) error {
6✔
354
                mcStoreBkt := tx.ReadBucket(resultsKey)
3✔
355
                if mcStoreBkt == nil {
3✔
356
                        return fmt.Errorf("top level mission control bucket " +
×
357
                                "not found")
×
358
                }
×
359

360
                // Iterate through all the keys in the bucket and collect the
361
                // namespaces.
362
                return mcStoreBkt.ForEach(func(k, _ []byte) error {
6✔
363
                        // We've already initialised the default namespace so
3✔
364
                        // we can skip it.
3✔
365
                        if string(k) == DefaultMissionControlNamespace {
6✔
366
                                return nil
3✔
367
                        }
3✔
368

369
                        namespaces[string(k)] = struct{}{}
×
370

×
371
                        return nil
×
372
                })
373
        }, func() {})
3✔
374
        if err != nil {
3✔
375
                return err
×
376
        }
×
377

378
        // Now, iterate through all the namespaces and initialise them.
379
        for ns := range namespaces {
3✔
380
                _, err = m.initMissionControl(ns)
×
381
                if err != nil {
×
382
                        return err
×
383
                }
×
384
        }
385

386
        return nil
3✔
387
}
388

389
// initMissionControl creates a new MissionControl instance with the given
390
// namespace if one does not yet exist.
391
//
392
// NOTE: the MissionController's mutex must be held before calling this method.
393
func (m *MissionController) initMissionControl(namespace string) (
394
        *MissionControl, error) {
3✔
395

3✔
396
        // If a mission control with this namespace has already been initialised
3✔
397
        // then there is nothing left to do.
3✔
398
        if mc, ok := m.mc[namespace]; ok {
3✔
399
                return mc, nil
×
400
        }
×
401

402
        cfg := m.defaultMCCfg
3✔
403

3✔
404
        store, err := newMissionControlStore(
3✔
405
                newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
3✔
406
                cfg.McFlushInterval,
3✔
407
        )
3✔
408
        if err != nil {
3✔
409
                return nil, err
×
410
        }
×
411

412
        mc := &MissionControl{
3✔
413
                cfg: m.cfg,
3✔
414
                state: newMissionControlState(
3✔
415
                        cfg.MinFailureRelaxInterval,
3✔
416
                ),
3✔
417
                store:          store,
3✔
418
                estimator:      cfg.Estimator,
3✔
419
                log:            log.WithPrefix(fmt.Sprintf("[%s]:", namespace)),
3✔
420
                onConfigUpdate: cfg.OnConfigUpdate,
3✔
421
        }
3✔
422

3✔
423
        m.mc[namespace] = mc
3✔
424

3✔
425
        return mc, nil
3✔
426
}
427

428
// RunStoreTickers runs the mission controller store's tickers.
429
func (m *MissionController) RunStoreTickers() {
3✔
430
        m.mu.Lock()
3✔
431
        defer m.mu.Unlock()
3✔
432

3✔
433
        for _, mc := range m.mc {
6✔
434
                mc.store.run()
3✔
435
        }
3✔
436
}
437

438
// StopStoreTickers stops the mission control store's tickers.
439
func (m *MissionController) StopStoreTickers() {
3✔
440
        log.Debug("Stopping mission control store ticker")
3✔
441
        defer log.Debug("Mission control store ticker stopped")
3✔
442

3✔
443
        m.mu.Lock()
3✔
444
        defer m.mu.Unlock()
3✔
445

3✔
446
        for _, mc := range m.mc {
6✔
447
                mc.store.stop()
3✔
448
        }
3✔
449
}
450

451
// init initializes mission control with historical data.
452
func (m *MissionControl) init() error {
3✔
453
        m.log.Debugf("Mission control state reconstruction started")
3✔
454

3✔
455
        m.mu.Lock()
3✔
456
        defer m.mu.Unlock()
3✔
457

3✔
458
        start := time.Now()
3✔
459

3✔
460
        results, err := m.store.fetchAll()
3✔
461
        if err != nil {
3✔
462
                return err
×
463
        }
×
464

465
        for _, result := range results {
6✔
466
                _ = m.applyPaymentResult(result)
3✔
467
        }
3✔
468

469
        m.log.Debugf("Mission control state reconstruction finished: "+
3✔
470
                "n=%v, time=%v", len(results), time.Since(start))
3✔
471

3✔
472
        return nil
3✔
473
}
474

475
// GetConfig returns the config that mission control is currently configured
476
// with. All fields are copied by value, so we do not need to worry about
477
// mutation.
478
func (m *MissionControl) GetConfig() *MissionControlConfig {
3✔
479
        m.mu.Lock()
3✔
480
        defer m.mu.Unlock()
3✔
481

3✔
482
        return &MissionControlConfig{
3✔
483
                Estimator:               m.estimator,
3✔
484
                MaxMcHistory:            m.store.maxRecords,
3✔
485
                McFlushInterval:         m.store.flushInterval,
3✔
486
                MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
3✔
487
        }
3✔
488
}
3✔
489

490
// SetConfig validates the config provided and updates mission control's config
491
// if it is valid.
492
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
3✔
493
        if cfg == nil {
3✔
494
                return errors.New("nil mission control config")
×
495
        }
×
496

497
        if err := cfg.validate(); err != nil {
3✔
498
                return err
×
499
        }
×
500

501
        m.mu.Lock()
3✔
502
        defer m.mu.Unlock()
3✔
503

3✔
504
        m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
3✔
505
                cfg.Estimator)
3✔
506

3✔
507
        m.store.maxRecords = cfg.MaxMcHistory
3✔
508
        m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
3✔
509
        m.estimator = cfg.Estimator
3✔
510

3✔
511
        // Execute the callback function if it is set.
3✔
512
        m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
6✔
513
                f(cfg)
3✔
514
        })
3✔
515

516
        return nil
3✔
517
}
518

519
// ResetHistory resets the history of MissionControl returning it to a state as
520
// if no payment attempts have been made.
521
func (m *MissionControl) ResetHistory() error {
3✔
522
        m.mu.Lock()
3✔
523
        defer m.mu.Unlock()
3✔
524

3✔
525
        if err := m.store.clear(); err != nil {
3✔
526
                return err
×
527
        }
×
528

529
        m.state.resetHistory()
3✔
530

3✔
531
        m.log.Debugf("Mission control history cleared")
3✔
532

3✔
533
        return nil
3✔
534
}
535

536
// GetProbability is expected to return the success probability of a payment
537
// from fromNode along edge.
538
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
539
        amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
3✔
540

3✔
541
        m.mu.Lock()
3✔
542
        defer m.mu.Unlock()
3✔
543

3✔
544
        now := m.cfg.clock.Now()
3✔
545
        results, _ := m.state.getLastPairResult(fromNode)
3✔
546

3✔
547
        // Use a distinct probability estimation function for local channels.
3✔
548
        if fromNode == m.cfg.selfNode {
6✔
549
                return m.estimator.LocalPairProbability(now, results, toNode)
3✔
550
        }
3✔
551

552
        return m.estimator.PairProbability(
3✔
553
                now, results, toNode, amt, capacity,
3✔
554
        )
3✔
555
}
556

557
// GetHistorySnapshot takes a snapshot from the current mission control state
558
// and actual probability estimates.
559
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
×
560
        m.mu.Lock()
×
561
        defer m.mu.Unlock()
×
562

×
563
        m.log.Debugf("Requesting history snapshot from mission control")
×
564

×
565
        return m.state.getSnapshot()
×
566
}
×
567

568
// ImportHistory imports the set of mission control results provided to our
569
// in-memory state. These results are not persisted, so will not survive
570
// restarts.
571
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
572
        force bool) error {
3✔
573

3✔
574
        if history == nil {
3✔
575
                return errors.New("cannot import nil history")
×
576
        }
×
577

578
        m.mu.Lock()
3✔
579
        defer m.mu.Unlock()
3✔
580

3✔
581
        m.log.Infof("Importing history snapshot with %v pairs to mission "+
3✔
582
                "control", len(history.Pairs))
3✔
583

3✔
584
        imported := m.state.importSnapshot(history, force)
3✔
585

3✔
586
        m.log.Infof("Imported %v results to mission control", imported)
3✔
587

3✔
588
        return nil
3✔
589
}
590

591
// GetPairHistorySnapshot returns the stored history for a given node pair.
592
func (m *MissionControl) GetPairHistorySnapshot(
593
        fromNode, toNode route.Vertex) TimedPairResult {
3✔
594

3✔
595
        m.mu.Lock()
3✔
596
        defer m.mu.Unlock()
3✔
597

3✔
598
        results, ok := m.state.getLastPairResult(fromNode)
3✔
599
        if !ok {
6✔
600
                return TimedPairResult{}
3✔
601
        }
3✔
602

603
        result, ok := results[toNode]
3✔
604
        if !ok {
3✔
605
                return TimedPairResult{}
×
606
        }
×
607

608
        return result
3✔
609
}
610

611
// ReportPaymentFail reports a failed payment to mission control as input for
612
// future probability estimates. The failureSourceIdx argument indicates the
613
// failure source. If it is nil, the failure source is unknown. This function
614
// returns a reason if this failure is a final failure. In that case no further
615
// payment attempts need to be made.
616
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
617
        failureSourceIdx *int, failure lnwire.FailureMessage) (
618
        *channeldb.FailureReason, error) {
3✔
619

3✔
620
        timestamp := m.cfg.clock.Now()
3✔
621

3✔
622
        result := newPaymentResult(
3✔
623
                paymentID, extractMCRoute(rt), timestamp, timestamp,
3✔
624
                newPaymentFailure(failureSourceIdx, failure),
3✔
625
        )
3✔
626

3✔
627
        return m.processPaymentResult(result)
3✔
628
}
3✔
629

630
// ReportPaymentSuccess reports a successful payment to mission control as input
631
// for future probability estimates.
632
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
633
        rt *route.Route) error {
3✔
634

3✔
635
        timestamp := m.cfg.clock.Now()
3✔
636

3✔
637
        result := newPaymentResult(
3✔
638
                paymentID, extractMCRoute(rt), timestamp, timestamp, nil,
3✔
639
        )
3✔
640

3✔
641
        _, err := m.processPaymentResult(result)
3✔
642

3✔
643
        return err
3✔
644
}
3✔
645

646
// processPaymentResult stores a payment result in the mission control store and
647
// updates mission control's in-memory state.
648
func (m *MissionControl) processPaymentResult(result *paymentResult) (
649
        *channeldb.FailureReason, error) {
3✔
650

3✔
651
        // Store complete result in database.
3✔
652
        m.store.AddResult(result)
3✔
653

3✔
654
        m.mu.Lock()
3✔
655
        defer m.mu.Unlock()
3✔
656

3✔
657
        // Apply result to update mission control state.
3✔
658
        reason := m.applyPaymentResult(result)
3✔
659

3✔
660
        return reason, nil
3✔
661
}
3✔
662

663
// applyPaymentResult applies a payment result as input for future probability
664
// estimates. It returns a bool indicating whether this error is a final error
665
// and no further payment attempts need to be made.
666
func (m *MissionControl) applyPaymentResult(
667
        result *paymentResult) *channeldb.FailureReason {
3✔
668

3✔
669
        // Interpret result.
3✔
670
        i := interpretResult(&result.route.Val, result.failure.ValOpt())
3✔
671

3✔
672
        if i.policyFailure != nil {
6✔
673
                if m.state.requestSecondChance(
3✔
674
                        time.Unix(0, int64(result.timeReply.Val)),
3✔
675
                        i.policyFailure.From, i.policyFailure.To,
3✔
676
                ) {
6✔
677
                        return nil
3✔
678
                }
3✔
679
        }
680

681
        // If there is a node-level failure, record a failure for every tried
682
        // connection of that node. A node-level failure can be considered as a
683
        // failure that would have occurred with any of the node's channels.
684
        //
685
        // Ideally we'd also record the failure for the untried connections of
686
        // the node. Unfortunately this would require access to the graph and
687
        // adding this dependency and db calls does not outweigh the benefits.
688
        //
689
        // Untried connections will fall back to the node probability. After the
690
        // call to setAllPairResult below, the node probability will be equal to
691
        // the probability of the tried channels except that the a priori
692
        // probability is mixed in too. This effect is controlled by the
693
        // aprioriWeight parameter. If that parameter isn't set to an extreme
694
        // and there are a few known connections, there shouldn't be much of a
695
        // difference. The largest difference occurs when aprioriWeight is 1. In
696
        // that case, a node-level failure would not be applied to untried
697
        // channels.
698
        if i.nodeFailure != nil {
3✔
699
                m.log.Debugf("Reporting node failure to Mission Control: "+
×
700
                        "node=%v", *i.nodeFailure)
×
701

×
702
                m.state.setAllFail(
×
703
                        *i.nodeFailure,
×
704
                        time.Unix(0, int64(result.timeReply.Val)),
×
705
                )
×
706
        }
×
707

708
        for pair, pairResult := range i.pairResults {
6✔
709
                pairResult := pairResult
3✔
710

3✔
711
                if pairResult.success {
6✔
712
                        m.log.Debugf("Reporting pair success to Mission "+
3✔
713
                                "Control: pair=%v, amt=%v",
3✔
714
                                pair, pairResult.amt)
3✔
715
                } else {
6✔
716
                        m.log.Debugf("Reporting pair failure to Mission "+
3✔
717
                                "Control: pair=%v, amt=%v",
3✔
718
                                pair, pairResult.amt)
3✔
719
                }
3✔
720

721
                m.state.setLastPairResult(
3✔
722
                        pair.From, pair.To,
3✔
723
                        time.Unix(0, int64(result.timeReply.Val)), &pairResult,
3✔
724
                        false,
3✔
725
                )
3✔
726
        }
727

728
        return i.finalFailureReason
3✔
729
}
730

731
// namespacedDB is an implementation of the missionControlDB that gives a user
732
// of the interface access to a namespaced bucket within the top level mission
733
// control bucket.
734
type namespacedDB struct {
735
        topLevelBucketKey []byte
736
        namespace         []byte
737
        db                kvdb.Backend
738
}
739

740
// A compile-time check to ensure that namespacedDB implements missionControlDB.
741
var _ missionControlDB = (*namespacedDB)(nil)
742

743
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
744
// default namespace.
745
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
×
746
        return newNamespacedDB(db, DefaultMissionControlNamespace)
×
747
}
×
748

749
// newNamespacedDB creates a new instance of missionControlDB where the DB will
750
// have access to a namespaced bucket within the top level mission control
751
// bucket.
752
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
3✔
753
        return &namespacedDB{
3✔
754
                db:                db,
3✔
755
                namespace:         []byte(namespace),
3✔
756
                topLevelBucketKey: resultsKey,
3✔
757
        }
3✔
758
}
3✔
759

760
// update can be used to perform reads and writes on the given bucket.
761
//
762
// NOTE: this is part of the missionControlDB interface.
763
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
764
        reset func()) error {
3✔
765

3✔
766
        return n.db.Update(func(tx kvdb.RwTx) error {
6✔
767
                mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
3✔
768
                if err != nil {
3✔
769
                        return fmt.Errorf("cannot create top level mission "+
×
770
                                "control bucket: %w", err)
×
771
                }
×
772

773
                namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
3✔
774
                        n.namespace,
3✔
775
                )
3✔
776
                if err != nil {
3✔
777
                        return fmt.Errorf("cannot create namespaced bucket "+
×
778
                                "(%s) in mission control store: %w",
×
779
                                n.namespace, err)
×
780
                }
×
781

782
                return f(namespacedBkt)
3✔
783
        }, reset)
784
}
785

786
// view can be used to perform reads on the given bucket.
787
//
788
// NOTE: this is part of the missionControlDB interface.
789
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
790
        reset func()) error {
3✔
791

3✔
792
        return n.db.View(func(tx kvdb.RTx) error {
6✔
793
                mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
3✔
794
                if mcStoreBkt == nil {
3✔
795
                        return fmt.Errorf("top level mission control bucket " +
×
796
                                "not found")
×
797
                }
×
798

799
                namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
3✔
800
                if namespacedBkt == nil {
3✔
801
                        return fmt.Errorf("namespaced bucket (%s) not found "+
×
802
                                "in mission control store", n.namespace)
×
803
                }
×
804

805
                return f(namespacedBkt)
3✔
806
        }, reset)
807
}
808

809
// purge will delete all the contents in the namespace.
810
//
811
// NOTE: this is part of the missionControlDB interface.
812
func (n *namespacedDB) purge() error {
3✔
813
        return n.db.Update(func(tx kvdb.RwTx) error {
6✔
814
                mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
3✔
815
                if mcStoreBkt == nil {
3✔
816
                        return nil
×
817
                }
×
818

819
                err := mcStoreBkt.DeleteNestedBucket(n.namespace)
3✔
820
                if err != nil {
3✔
821
                        return err
×
822
                }
×
823

824
                _, err = mcStoreBkt.CreateBucket(n.namespace)
3✔
825

3✔
826
                return err
3✔
827
        }, func() {})
3✔
828
}
829

830
// paymentFailure represents the presence of a payment failure. It may or may
831
// not include additional information about said failure.
832
type paymentFailure struct {
833
        info tlv.OptionalRecordT[tlv.TlvType0, paymentFailureInfo]
834
}
835

836
// newPaymentFailure constructs a new paymentFailure struct. If the source
837
// index is nil, then an empty paymentFailure is returned. This represents a
838
// failure with unknown details. Otherwise, the index and failure message are
839
// used to populate the info field of the paymentFailure.
840
func newPaymentFailure(sourceIdx *int,
841
        failureMsg lnwire.FailureMessage) *paymentFailure {
3✔
842

3✔
843
        if sourceIdx == nil {
3✔
844
                return &paymentFailure{}
×
845
        }
×
846

847
        info := paymentFailureInfo{
3✔
848
                sourceIdx: tlv.NewPrimitiveRecord[tlv.TlvType0](
3✔
849
                        uint8(*sourceIdx),
3✔
850
                ),
3✔
851
                msg: tlv.NewRecordT[tlv.TlvType1](failureMessage{failureMsg}),
3✔
852
        }
3✔
853

3✔
854
        return &paymentFailure{
3✔
855
                info: tlv.SomeRecordT(tlv.NewRecordT[tlv.TlvType0](info)),
3✔
856
        }
3✔
857
}
858

859
// Record returns a TLV record that can be used to encode/decode a
860
// paymentFailure to/from a TLV stream.
861
func (r *paymentFailure) Record() tlv.Record {
3✔
862
        recordSize := func() uint64 {
6✔
863
                var (
3✔
864
                        b   bytes.Buffer
3✔
865
                        buf [8]byte
3✔
866
                )
3✔
867
                if err := encodePaymentFailure(&b, r, &buf); err != nil {
3✔
868
                        panic(err)
×
869
                }
870

871
                return uint64(len(b.Bytes()))
3✔
872
        }
873

874
        return tlv.MakeDynamicRecord(
3✔
875
                0, r, recordSize, encodePaymentFailure, decodePaymentFailure,
3✔
876
        )
3✔
877
}
878

879
func encodePaymentFailure(w io.Writer, val interface{}, _ *[8]byte) error {
3✔
880
        if v, ok := val.(*paymentFailure); ok {
6✔
881
                var recordProducers []tlv.RecordProducer
3✔
882
                v.info.WhenSome(
3✔
883
                        func(r tlv.RecordT[tlv.TlvType0, paymentFailureInfo]) {
6✔
884
                                recordProducers = append(recordProducers, &r)
3✔
885
                        },
3✔
886
                )
887

888
                return lnwire.EncodeRecordsTo(
3✔
889
                        w, lnwire.ProduceRecordsSorted(recordProducers...),
3✔
890
                )
3✔
891
        }
892

893
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailure")
×
894
}
895

896
func decodePaymentFailure(r io.Reader, val interface{}, _ *[8]byte,
897
        l uint64) error {
3✔
898

3✔
899
        if v, ok := val.(*paymentFailure); ok {
6✔
900
                var h paymentFailure
3✔
901

3✔
902
                info := tlv.ZeroRecordT[tlv.TlvType0, paymentFailureInfo]()
3✔
903
                typeMap, err := lnwire.DecodeRecords(
3✔
904
                        r, lnwire.ProduceRecordsSorted(&info)...,
3✔
905
                )
3✔
906
                if err != nil {
3✔
907
                        return err
×
908
                }
×
909

910
                if _, ok := typeMap[h.info.TlvType()]; ok {
6✔
911
                        h.info = tlv.SomeRecordT(info)
3✔
912
                }
3✔
913

914
                *v = h
3✔
915

3✔
916
                return nil
3✔
917
        }
918

919
        return tlv.NewTypeForDecodingErr(val, "routing.paymentFailure", l, l)
×
920
}
921

922
// paymentFailureInfo holds additional information about a payment failure.
923
type paymentFailureInfo struct {
924
        sourceIdx tlv.RecordT[tlv.TlvType0, uint8]
925
        msg       tlv.RecordT[tlv.TlvType1, failureMessage]
926
}
927

928
// Record returns a TLV record that can be used to encode/decode a
929
// paymentFailureInfo to/from a TLV stream.
930
func (r *paymentFailureInfo) Record() tlv.Record {
3✔
931
        recordSize := func() uint64 {
6✔
932
                var (
3✔
933
                        b   bytes.Buffer
3✔
934
                        buf [8]byte
3✔
935
                )
3✔
936
                if err := encodePaymentFailureInfo(&b, r, &buf); err != nil {
3✔
937
                        panic(err)
×
938
                }
939

940
                return uint64(len(b.Bytes()))
3✔
941
        }
942

943
        return tlv.MakeDynamicRecord(
3✔
944
                0, r, recordSize, encodePaymentFailureInfo,
3✔
945
                decodePaymentFailureInfo,
3✔
946
        )
3✔
947
}
948

949
func encodePaymentFailureInfo(w io.Writer, val interface{}, _ *[8]byte) error {
3✔
950
        if v, ok := val.(*paymentFailureInfo); ok {
6✔
951
                return lnwire.EncodeRecordsTo(
3✔
952
                        w, lnwire.ProduceRecordsSorted(
3✔
953
                                &v.sourceIdx, &v.msg,
3✔
954
                        ),
3✔
955
                )
3✔
956
        }
3✔
957

958
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailureInfo")
×
959
}
960

961
func decodePaymentFailureInfo(r io.Reader, val interface{}, _ *[8]byte,
962
        l uint64) error {
3✔
963

3✔
964
        if v, ok := val.(*paymentFailureInfo); ok {
6✔
965
                var h paymentFailureInfo
3✔
966

3✔
967
                _, err := lnwire.DecodeRecords(
3✔
968
                        r,
3✔
969
                        lnwire.ProduceRecordsSorted(&h.sourceIdx, &h.msg)...,
3✔
970
                )
3✔
971
                if err != nil {
3✔
972
                        return err
×
973
                }
×
974

975
                *v = h
3✔
976

3✔
977
                return nil
3✔
978
        }
979

980
        return tlv.NewTypeForDecodingErr(
×
981
                val, "routing.paymentFailureInfo", l, l,
×
982
        )
×
983
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc