• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13236757158

10 Feb 2025 08:39AM UTC coverage: 57.649% (-1.2%) from 58.815%
13236757158

Pull #9493

github

ziggie1984
lncli: for some cmds we don't replace the data of the response.

For some cmds it is not very practical to replace the json output
because we might pipe it into other commands. For example when
creating the route we want to pipe it into sendtoRoute.
Pull Request #9493: For some lncli cmds we should not replace the content with other data

0 of 9 new or added lines in 2 files covered. (0.0%)

19535 existing lines in 252 files now uncovered.

103517 of 179563 relevant lines covered (57.65%)

24878.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.17
/routing/missioncontrol.go
1
package routing
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/channeldb"
15
        "github.com/lightningnetwork/lnd/clock"
16
        "github.com/lightningnetwork/lnd/fn/v2"
17
        "github.com/lightningnetwork/lnd/kvdb"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/routing/route"
20
        "github.com/lightningnetwork/lnd/tlv"
21
)
22

23
const (
24
        // DefaultPenaltyHalfLife is the default half-life duration. The
25
        // half-life duration defines after how much time a penalized node or
26
        // channel is back at 50% probability.
27
        DefaultPenaltyHalfLife = time.Hour
28

29
        // minSecondChanceInterval is the minimum time required between
30
        // second-chance failures.
31
        //
32
        // If nodes return a channel policy related failure, they may get a
33
        // second chance to forward the payment. It could be that the channel
34
        // policy that we are aware of is not up to date. This is especially
35
        // important in case of mobile apps that are mostly offline.
36
        //
37
        // However, we don't want to give nodes the option to endlessly return
38
        // new channel updates so that we are kept busy trying to route through
39
        // that node until the payment loop times out.
40
        //
41
        // Therefore we only grant a second chance to a node if the previous
42
        // second chance is sufficiently long ago. This is what
43
        // minSecondChanceInterval defines. If a second policy failure comes in
44
        // within that interval, we will apply a penalty.
45
        //
46
        // Second chances granted are tracked on the level of node pairs. This
47
        // means that if a node has multiple channels to the same peer, they
48
        // will only get a single second chance to route to that peer again.
49
        // Nodes forward non-strict, so it isn't necessary to apply a less
50
        // restrictive channel level tracking scheme here.
51
        minSecondChanceInterval = time.Minute
52

53
        // DefaultMaxMcHistory is the default maximum history size.
54
        DefaultMaxMcHistory = 1000
55

56
        // DefaultMcFlushInterval is the default interval we use to flush MC state
57
        // to the database.
58
        DefaultMcFlushInterval = time.Second
59

60
        // prevSuccessProbability is the assumed probability for node pairs that
61
        // successfully relayed the previous attempt.
62
        prevSuccessProbability = 0.95
63

64
        // DefaultAprioriWeight is the default a priori weight. See
65
        // MissionControlConfig for further explanation.
66
        DefaultAprioriWeight = 0.5
67

68
        // DefaultMinFailureRelaxInterval is the default minimum time that must
69
        // have passed since the previously recorded failure before the failure
70
        // amount may be raised.
71
        DefaultMinFailureRelaxInterval = time.Minute
72

73
        // DefaultFeeEstimationTimeout is the default value for
74
        // FeeEstimationTimeout. It defines the maximum duration that the
75
        // probing fee estimation is allowed to take.
76
        DefaultFeeEstimationTimeout = time.Minute
77

78
        // DefaultMissionControlNamespace is the name of the default mission
79
        // control name space. This is used as the sub-bucket key within the
80
        // top level DB bucket to store mission control results.
81
        DefaultMissionControlNamespace = "default"
82
)
83

84
var (
85
        // ErrInvalidMcHistory is returned if we get a negative mission control
86
        // history count.
87
        ErrInvalidMcHistory = errors.New("mission control history must be " +
88
                ">= 0")
89

90
        // ErrInvalidFailureInterval is returned if we get an invalid failure
91
        // interval.
92
        ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
93
)
94

95
// NodeResults contains previous results from a node to its peers.
96
type NodeResults map[route.Vertex]TimedPairResult
97

98
// mcConfig holds various config members that will be required by all
99
// MissionControl instances and will be the same regardless of namespace.
100
type mcConfig struct {
101
        // clock is a time source used by mission control.
102
        clock clock.Clock
103

104
        // selfNode is our pubkey.
105
        selfNode route.Vertex
106
}
107

108
// MissionControl contains state which summarizes the past attempts of HTLC
109
// routing by external callers when sending payments throughout the network. It
110
// acts as a shared memory during routing attempts with the goal to optimize the
111
// payment attempt success rate.
112
//
113
// Failed payment attempts are reported to mission control. These reports are
114
// used to track the time of the last node or channel level failure. The time
115
// since the last failure is used to estimate a success probability that is fed
116
// into the path finding process for subsequent payment attempts.
117
type MissionControl struct {
118
        cfg *mcConfig
119

120
        // state is the internal mission control state that is input for
121
        // probability estimation.
122
        state *missionControlState
123

124
        store *missionControlStore
125

126
        // estimator is the probability estimator that is used with the payment
127
        // results that mission control collects.
128
        estimator Estimator
129

130
        // onConfigUpdate is a function that is called whenever the
131
        // mission control state is updated.
132
        onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
133

134
        log btclog.Logger
135

136
        mu sync.Mutex
137
}
138

139
// MissionController manages MissionControl instances in various namespaces.
140
type MissionController struct {
141
        db           kvdb.Backend
142
        cfg          *mcConfig
143
        defaultMCCfg *MissionControlConfig
144

145
        mc map[string]*MissionControl
146
        mu sync.Mutex
147

148
        // TODO(roasbeef): further counters, if vertex continually unavailable,
149
        // add to another generation
150

151
        // TODO(roasbeef): also add favorable metrics for nodes
152
}
153

154
// GetNamespacedStore returns the MissionControl in the given namespace. If one
155
// does not yet exist, then it is initialised.
156
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
157
        error) {
34✔
158

34✔
159
        m.mu.Lock()
34✔
160
        defer m.mu.Unlock()
34✔
161

34✔
162
        if mc, ok := m.mc[ns]; ok {
67✔
163
                return mc, nil
33✔
164
        }
33✔
165

166
        return m.initMissionControl(ns)
1✔
167
}
168

169
// ListNamespaces returns a list of the namespaces that the MissionController
170
// is aware of.
171
func (m *MissionController) ListNamespaces() []string {
3✔
172
        m.mu.Lock()
3✔
173
        defer m.mu.Unlock()
3✔
174

3✔
175
        namespaces := make([]string, 0, len(m.mc))
3✔
176
        for ns := range m.mc {
8✔
177
                namespaces = append(namespaces, ns)
5✔
178
        }
5✔
179

180
        return namespaces
3✔
181
}
182

183
// MissionControlConfig defines parameters that control mission control
184
// behaviour.
185
type MissionControlConfig struct {
186
        // Estimator gives probability estimates for node pairs.
187
        Estimator Estimator
188

189
        // OnConfigUpdate is function that is called whenever the
190
        // mission control state is updated.
191
        OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
192

193
        // MaxMcHistory defines the maximum number of payment results that are
194
        // held on disk.
195
        MaxMcHistory int
196

197
        // McFlushInterval defines the ticker interval when we flush the
198
        // accumulated state to the DB.
199
        McFlushInterval time.Duration
200

201
        // MinFailureRelaxInterval is the minimum time that must have passed
202
        // since the previously recorded failure before the failure amount may
203
        // be raised.
204
        MinFailureRelaxInterval time.Duration
205
}
206

207
func (c *MissionControlConfig) validate() error {
31✔
208
        if c.MaxMcHistory < 0 {
31✔
209
                return ErrInvalidMcHistory
×
210
        }
×
211

212
        if c.MinFailureRelaxInterval < 0 {
31✔
213
                return ErrInvalidFailureInterval
×
214
        }
×
215

216
        return nil
31✔
217
}
218

219
// String returns a string representation of a mission control config.
UNCOV
220
func (c *MissionControlConfig) String() string {
×
UNCOV
221
        return fmt.Sprintf("maximum history: %v, minimum failure relax "+
×
UNCOV
222
                "interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
×
UNCOV
223
}
×
224

225
// TimedPairResult describes a timestamped pair result.
226
type TimedPairResult struct {
227
        // FailTime is the time of the last failure.
228
        FailTime time.Time
229

230
        // FailAmt is the amount of the last failure. This amount may be pushed
231
        // up if a later success is higher than the last failed amount.
232
        FailAmt lnwire.MilliSatoshi
233

234
        // SuccessTime is the time of the last success.
235
        SuccessTime time.Time
236

237
        // SuccessAmt is the highest amount that successfully forwarded. This
238
        // isn't necessarily the last success amount. The value of this field
239
        // may also be pushed down if a later failure is lower than the highest
240
        // success amount. Because of this, SuccessAmt may not match
241
        // SuccessTime.
242
        SuccessAmt lnwire.MilliSatoshi
243
}
244

245
// MissionControlSnapshot contains a snapshot of the current state of mission
246
// control.
247
type MissionControlSnapshot struct {
248
        // Pairs is a list of channels for which specific information is
249
        // logged.
250
        Pairs []MissionControlPairSnapshot
251
}
252

253
// MissionControlPairSnapshot contains a snapshot of the current node pair
254
// state in mission control.
255
type MissionControlPairSnapshot struct {
256
        // Pair is the node pair of which the state is described.
257
        Pair DirectedNodePair
258

259
        // TimedPairResult contains the data for this pair.
260
        TimedPairResult
261
}
262

263
// paymentResult is the information that becomes available when a payment
264
// attempt completes.
265
type paymentResult struct {
266
        id        uint64
267
        timeFwd   tlv.RecordT[tlv.TlvType0, uint64]
268
        timeReply tlv.RecordT[tlv.TlvType1, uint64]
269
        route     tlv.RecordT[tlv.TlvType2, mcRoute]
270

271
        // failure holds information related to the failure of a payment. The
272
        // presence of this record indicates a payment failure. The absence of
273
        // this record indicates a successful payment.
274
        failure tlv.OptionalRecordT[tlv.TlvType3, paymentFailure]
275
}
276

277
// newPaymentResult constructs a new paymentResult.
278
func newPaymentResult(id uint64, rt *mcRoute, timeFwd, timeReply time.Time,
279
        failure *paymentFailure) *paymentResult {
76✔
280

76✔
281
        result := &paymentResult{
76✔
282
                id: id,
76✔
283
                timeFwd: tlv.NewPrimitiveRecord[tlv.TlvType0](
76✔
284
                        uint64(timeFwd.UnixNano()),
76✔
285
                ),
76✔
286
                timeReply: tlv.NewPrimitiveRecord[tlv.TlvType1](
76✔
287
                        uint64(timeReply.UnixNano()),
76✔
288
                ),
76✔
289
                route: tlv.NewRecordT[tlv.TlvType2](*rt),
76✔
290
        }
76✔
291

76✔
292
        if failure != nil {
129✔
293
                result.failure = tlv.SomeRecordT(
53✔
294
                        tlv.NewRecordT[tlv.TlvType3](*failure),
53✔
295
                )
53✔
296
        }
53✔
297

298
        return result
76✔
299
}
300

301
// NewMissionController returns a new instance of MissionController.
302
func NewMissionController(db kvdb.Backend, self route.Vertex,
303
        cfg *MissionControlConfig) (*MissionController, error) {
31✔
304

31✔
305
        log.Debugf("Instantiating mission control with config: %v, %v", cfg,
31✔
306
                cfg.Estimator)
31✔
307

31✔
308
        if err := cfg.validate(); err != nil {
31✔
309
                return nil, err
×
310
        }
×
311

312
        mcCfg := &mcConfig{
31✔
313
                clock:    clock.NewDefaultClock(),
31✔
314
                selfNode: self,
31✔
315
        }
31✔
316

31✔
317
        mgr := &MissionController{
31✔
318
                db:           db,
31✔
319
                defaultMCCfg: cfg,
31✔
320
                cfg:          mcCfg,
31✔
321
                mc:           make(map[string]*MissionControl),
31✔
322
        }
31✔
323

31✔
324
        if err := mgr.loadMissionControls(); err != nil {
31✔
325
                return nil, err
×
326
        }
×
327

328
        for _, mc := range mgr.mc {
63✔
329
                if err := mc.init(); err != nil {
32✔
330
                        return nil, err
×
331
                }
×
332
        }
333

334
        return mgr, nil
31✔
335
}
336

337
// loadMissionControls initialises a MissionControl in the default namespace if
338
// one does not yet exist. It then initialises a MissionControl for all other
339
// namespaces found in the DB.
340
//
341
// NOTE: this should only be called once during MissionController construction.
342
func (m *MissionController) loadMissionControls() error {
31✔
343
        m.mu.Lock()
31✔
344
        defer m.mu.Unlock()
31✔
345

31✔
346
        // Always initialise the default namespace.
31✔
347
        _, err := m.initMissionControl(DefaultMissionControlNamespace)
31✔
348
        if err != nil {
31✔
349
                return err
×
350
        }
×
351

352
        namespaces := make(map[string]struct{})
31✔
353
        err = m.db.View(func(tx walletdb.ReadTx) error {
62✔
354
                mcStoreBkt := tx.ReadBucket(resultsKey)
31✔
355
                if mcStoreBkt == nil {
31✔
356
                        return fmt.Errorf("top level mission control bucket " +
×
357
                                "not found")
×
358
                }
×
359

360
                // Iterate through all the keys in the bucket and collect the
361
                // namespaces.
362
                return mcStoreBkt.ForEach(func(k, _ []byte) error {
63✔
363
                        // We've already initialised the default namespace so
32✔
364
                        // we can skip it.
32✔
365
                        if string(k) == DefaultMissionControlNamespace {
63✔
366
                                return nil
31✔
367
                        }
31✔
368

369
                        namespaces[string(k)] = struct{}{}
1✔
370

1✔
371
                        return nil
1✔
372
                })
373
        }, func() {})
31✔
374
        if err != nil {
31✔
375
                return err
×
376
        }
×
377

378
        // Now, iterate through all the namespaces and initialise them.
379
        for ns := range namespaces {
32✔
380
                _, err = m.initMissionControl(ns)
1✔
381
                if err != nil {
1✔
382
                        return err
×
383
                }
×
384
        }
385

386
        return nil
31✔
387
}
388

389
// initMissionControl creates a new MissionControl instance with the given
390
// namespace if one does not yet exist.
391
//
392
// NOTE: the MissionController's mutex must be held before calling this method.
393
func (m *MissionController) initMissionControl(namespace string) (
394
        *MissionControl, error) {
33✔
395

33✔
396
        // If a mission control with this namespace has already been initialised
33✔
397
        // then there is nothing left to do.
33✔
398
        if mc, ok := m.mc[namespace]; ok {
33✔
399
                return mc, nil
×
400
        }
×
401

402
        cfg := m.defaultMCCfg
33✔
403

33✔
404
        store, err := newMissionControlStore(
33✔
405
                newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
33✔
406
                cfg.McFlushInterval,
33✔
407
        )
33✔
408
        if err != nil {
33✔
409
                return nil, err
×
410
        }
×
411

412
        mc := &MissionControl{
33✔
413
                cfg: m.cfg,
33✔
414
                state: newMissionControlState(
33✔
415
                        cfg.MinFailureRelaxInterval,
33✔
416
                ),
33✔
417
                store:          store,
33✔
418
                estimator:      cfg.Estimator,
33✔
419
                log:            log.WithPrefix(fmt.Sprintf("[%s]:", namespace)),
33✔
420
                onConfigUpdate: cfg.OnConfigUpdate,
33✔
421
        }
33✔
422

33✔
423
        m.mc[namespace] = mc
33✔
424

33✔
425
        return mc, nil
33✔
426
}
427

428
// RunStoreTickers runs the mission controller store's tickers.
UNCOV
429
func (m *MissionController) RunStoreTickers() {
×
UNCOV
430
        m.mu.Lock()
×
UNCOV
431
        defer m.mu.Unlock()
×
UNCOV
432

×
UNCOV
433
        for _, mc := range m.mc {
×
UNCOV
434
                mc.store.run()
×
UNCOV
435
        }
×
436
}
437

438
// StopStoreTickers stops the mission control store's tickers.
UNCOV
439
func (m *MissionController) StopStoreTickers() {
×
UNCOV
440
        log.Debug("Stopping mission control store ticker")
×
UNCOV
441
        defer log.Debug("Mission control store ticker stopped")
×
UNCOV
442

×
UNCOV
443
        m.mu.Lock()
×
UNCOV
444
        defer m.mu.Unlock()
×
UNCOV
445

×
UNCOV
446
        for _, mc := range m.mc {
×
UNCOV
447
                mc.store.stop()
×
UNCOV
448
        }
×
449
}
450

451
// init initializes mission control with historical data.
452
func (m *MissionControl) init() error {
32✔
453
        m.log.Debugf("Mission control state reconstruction started")
32✔
454

32✔
455
        m.mu.Lock()
32✔
456
        defer m.mu.Unlock()
32✔
457

32✔
458
        start := time.Now()
32✔
459

32✔
460
        results, err := m.store.fetchAll()
32✔
461
        if err != nil {
32✔
462
                return err
×
463
        }
×
464

465
        for _, result := range results {
36✔
466
                _ = m.applyPaymentResult(result)
4✔
467
        }
4✔
468

469
        m.log.Debugf("Mission control state reconstruction finished: "+
32✔
470
                "n=%v, time=%v", len(results), time.Since(start))
32✔
471

32✔
472
        return nil
32✔
473
}
474

475
// GetConfig returns the config that mission control is currently configured
476
// with. All fields are copied by value, so we do not need to worry about
477
// mutation.
UNCOV
478
func (m *MissionControl) GetConfig() *MissionControlConfig {
×
UNCOV
479
        m.mu.Lock()
×
UNCOV
480
        defer m.mu.Unlock()
×
UNCOV
481

×
UNCOV
482
        return &MissionControlConfig{
×
UNCOV
483
                Estimator:               m.estimator,
×
UNCOV
484
                MaxMcHistory:            m.store.maxRecords,
×
UNCOV
485
                McFlushInterval:         m.store.flushInterval,
×
UNCOV
486
                MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
×
UNCOV
487
        }
×
UNCOV
488
}
×
489

490
// SetConfig validates the config provided and updates mission control's config
491
// if it is valid.
UNCOV
492
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
×
UNCOV
493
        if cfg == nil {
×
494
                return errors.New("nil mission control config")
×
495
        }
×
496

UNCOV
497
        if err := cfg.validate(); err != nil {
×
498
                return err
×
499
        }
×
500

UNCOV
501
        m.mu.Lock()
×
UNCOV
502
        defer m.mu.Unlock()
×
UNCOV
503

×
UNCOV
504
        m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
×
UNCOV
505
                cfg.Estimator)
×
UNCOV
506

×
UNCOV
507
        m.store.maxRecords = cfg.MaxMcHistory
×
UNCOV
508
        m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
×
UNCOV
509
        m.estimator = cfg.Estimator
×
UNCOV
510

×
UNCOV
511
        // Execute the callback function if it is set.
×
UNCOV
512
        m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
×
UNCOV
513
                f(cfg)
×
UNCOV
514
        })
×
515

UNCOV
516
        return nil
×
517
}
518

519
// ResetHistory resets the history of MissionControl returning it to a state as
520
// if no payment attempts have been made.
521
func (m *MissionControl) ResetHistory() error {
3✔
522
        m.mu.Lock()
3✔
523
        defer m.mu.Unlock()
3✔
524

3✔
525
        if err := m.store.clear(); err != nil {
3✔
526
                return err
×
527
        }
×
528

529
        m.state.resetHistory()
3✔
530

3✔
531
        m.log.Debugf("Mission control history cleared")
3✔
532

3✔
533
        return nil
3✔
534
}
535

536
// GetProbability is expected to return the success probability of a payment
537
// from fromNode along edge.
538
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
539
        amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
662✔
540

662✔
541
        m.mu.Lock()
662✔
542
        defer m.mu.Unlock()
662✔
543

662✔
544
        now := m.cfg.clock.Now()
662✔
545
        results, _ := m.state.getLastPairResult(fromNode)
662✔
546

662✔
547
        // Use a distinct probability estimation function for local channels.
662✔
548
        if fromNode == m.cfg.selfNode {
701✔
549
                return m.estimator.LocalPairProbability(now, results, toNode)
39✔
550
        }
39✔
551

552
        return m.estimator.PairProbability(
623✔
553
                now, results, toNode, amt, capacity,
623✔
554
        )
623✔
555
}
556

557
// GetHistorySnapshot takes a snapshot from the current mission control state
558
// and actual probability estimates.
559
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
1✔
560
        m.mu.Lock()
1✔
561
        defer m.mu.Unlock()
1✔
562

1✔
563
        m.log.Debugf("Requesting history snapshot from mission control")
1✔
564

1✔
565
        return m.state.getSnapshot()
1✔
566
}
1✔
567

568
// ImportHistory imports the set of mission control results provided to our
569
// in-memory state. These results are not persisted, so will not survive
570
// restarts.
571
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
UNCOV
572
        force bool) error {
×
UNCOV
573

×
UNCOV
574
        if history == nil {
×
575
                return errors.New("cannot import nil history")
×
576
        }
×
577

UNCOV
578
        m.mu.Lock()
×
UNCOV
579
        defer m.mu.Unlock()
×
UNCOV
580

×
UNCOV
581
        m.log.Infof("Importing history snapshot with %v pairs to mission "+
×
UNCOV
582
                "control", len(history.Pairs))
×
UNCOV
583

×
UNCOV
584
        imported := m.state.importSnapshot(history, force)
×
UNCOV
585

×
UNCOV
586
        m.log.Infof("Imported %v results to mission control", imported)
×
UNCOV
587

×
UNCOV
588
        return nil
×
589
}
590

591
// GetPairHistorySnapshot returns the stored history for a given node pair.
592
func (m *MissionControl) GetPairHistorySnapshot(
UNCOV
593
        fromNode, toNode route.Vertex) TimedPairResult {
×
UNCOV
594

×
UNCOV
595
        m.mu.Lock()
×
UNCOV
596
        defer m.mu.Unlock()
×
UNCOV
597

×
UNCOV
598
        results, ok := m.state.getLastPairResult(fromNode)
×
UNCOV
599
        if !ok {
×
UNCOV
600
                return TimedPairResult{}
×
UNCOV
601
        }
×
602

UNCOV
603
        result, ok := results[toNode]
×
UNCOV
604
        if !ok {
×
605
                return TimedPairResult{}
×
606
        }
×
607

UNCOV
608
        return result
×
609
}
610

611
// ReportPaymentFail reports a failed payment to mission control as input for
612
// future probability estimates. The failureSourceIdx argument indicates the
613
// failure source. If it is nil, the failure source is unknown. This function
614
// returns a reason if this failure is a final failure. In that case no further
615
// payment attempts need to be made.
616
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
617
        failureSourceIdx *int, failure lnwire.FailureMessage) (
618
        *channeldb.FailureReason, error) {
46✔
619

46✔
620
        timestamp := m.cfg.clock.Now()
46✔
621

46✔
622
        result := newPaymentResult(
46✔
623
                paymentID, extractMCRoute(rt), timestamp, timestamp,
46✔
624
                newPaymentFailure(failureSourceIdx, failure),
46✔
625
        )
46✔
626

46✔
627
        return m.processPaymentResult(result)
46✔
628
}
46✔
629

630
// ReportPaymentSuccess reports a successful payment to mission control as input
631
// for future probability estimates.
632
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
633
        rt *route.Route) error {
22✔
634

22✔
635
        timestamp := m.cfg.clock.Now()
22✔
636

22✔
637
        result := newPaymentResult(
22✔
638
                paymentID, extractMCRoute(rt), timestamp, timestamp, nil,
22✔
639
        )
22✔
640

22✔
641
        _, err := m.processPaymentResult(result)
22✔
642

22✔
643
        return err
22✔
644
}
22✔
645

646
// processPaymentResult stores a payment result in the mission control store and
647
// updates mission control's in-memory state.
648
func (m *MissionControl) processPaymentResult(result *paymentResult) (
649
        *channeldb.FailureReason, error) {
68✔
650

68✔
651
        // Store complete result in database.
68✔
652
        m.store.AddResult(result)
68✔
653

68✔
654
        m.mu.Lock()
68✔
655
        defer m.mu.Unlock()
68✔
656

68✔
657
        // Apply result to update mission control state.
68✔
658
        reason := m.applyPaymentResult(result)
68✔
659

68✔
660
        return reason, nil
68✔
661
}
68✔
662

663
// applyPaymentResult applies a payment result as input for future probability
664
// estimates. It returns a bool indicating whether this error is a final error
665
// and no further payment attempts need to be made.
666
func (m *MissionControl) applyPaymentResult(
667
        result *paymentResult) *channeldb.FailureReason {
72✔
668

72✔
669
        // Interpret result.
72✔
670
        i := interpretResult(&result.route.Val, result.failure.ValOpt())
72✔
671

72✔
672
        if i.policyFailure != nil {
81✔
673
                if m.state.requestSecondChance(
9✔
674
                        time.Unix(0, int64(result.timeReply.Val)),
9✔
675
                        i.policyFailure.From, i.policyFailure.To,
9✔
676
                ) {
15✔
677
                        return nil
6✔
678
                }
6✔
679
        }
680

681
        // If there is a node-level failure, record a failure for every tried
682
        // connection of that node. A node-level failure can be considered as a
683
        // failure that would have occurred with any of the node's channels.
684
        //
685
        // Ideally we'd also record the failure for the untried connections of
686
        // the node. Unfortunately this would require access to the graph and
687
        // adding this dependency and db calls does not outweigh the benefits.
688
        //
689
        // Untried connections will fall back to the node probability. After the
690
        // call to setAllPairResult below, the node probability will be equal to
691
        // the probability of the tried channels except that the a priori
692
        // probability is mixed in too. This effect is controlled by the
693
        // aprioriWeight parameter. If that parameter isn't set to an extreme
694
        // and there are a few known connections, there shouldn't be much of a
695
        // difference. The largest difference occurs when aprioriWeight is 1. In
696
        // that case, a node-level failure would not be applied to untried
697
        // channels.
698
        if i.nodeFailure != nil {
72✔
699
                m.log.Debugf("Reporting node failure to Mission Control: "+
6✔
700
                        "node=%v", *i.nodeFailure)
6✔
701

6✔
702
                m.state.setAllFail(
6✔
703
                        *i.nodeFailure,
6✔
704
                        time.Unix(0, int64(result.timeReply.Val)),
6✔
705
                )
6✔
706
        }
6✔
707

708
        for pair, pairResult := range i.pairResults {
212✔
709
                pairResult := pairResult
146✔
710

146✔
711
                if pairResult.success {
227✔
712
                        m.log.Debugf("Reporting pair success to Mission "+
81✔
713
                                "Control: pair=%v, amt=%v",
81✔
714
                                pair, pairResult.amt)
81✔
715
                } else {
146✔
716
                        m.log.Debugf("Reporting pair failure to Mission "+
65✔
717
                                "Control: pair=%v, amt=%v",
65✔
718
                                pair, pairResult.amt)
65✔
719
                }
65✔
720

721
                m.state.setLastPairResult(
146✔
722
                        pair.From, pair.To,
146✔
723
                        time.Unix(0, int64(result.timeReply.Val)), &pairResult,
146✔
724
                        false,
146✔
725
                )
146✔
726
        }
727

728
        return i.finalFailureReason
66✔
729
}
730

731
// namespacedDB is an implementation of the missionControlDB that gives a user
732
// of the interface access to a namespaced bucket within the top level mission
733
// control bucket.
734
type namespacedDB struct {
735
        topLevelBucketKey []byte
736
        namespace         []byte
737
        db                kvdb.Backend
738
}
739

740
// A compile-time check to ensure that namespacedDB implements missionControlDB.
741
var _ missionControlDB = (*namespacedDB)(nil)
742

743
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
744
// default namespace.
745
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
4✔
746
        return newNamespacedDB(db, DefaultMissionControlNamespace)
4✔
747
}
4✔
748

749
// newNamespacedDB creates a new instance of missionControlDB where the DB will
750
// have access to a namespaced bucket within the top level mission control
751
// bucket.
752
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
37✔
753
        return &namespacedDB{
37✔
754
                db:                db,
37✔
755
                namespace:         []byte(namespace),
37✔
756
                topLevelBucketKey: resultsKey,
37✔
757
        }
37✔
758
}
37✔
759

760
// update can be used to perform reads and writes on the given bucket.
761
//
762
// NOTE: this is part of the missionControlDB interface.
763
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
764
        reset func()) error {
46✔
765

46✔
766
        return n.db.Update(func(tx kvdb.RwTx) error {
92✔
767
                mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
46✔
768
                if err != nil {
46✔
769
                        return fmt.Errorf("cannot create top level mission "+
×
770
                                "control bucket: %w", err)
×
771
                }
×
772

773
                namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
46✔
774
                        n.namespace,
46✔
775
                )
46✔
776
                if err != nil {
46✔
777
                        return fmt.Errorf("cannot create namespaced bucket "+
×
778
                                "(%s) in mission control store: %w",
×
779
                                n.namespace, err)
×
780
                }
×
781

782
                return f(namespacedBkt)
46✔
783
        }, reset)
784
}
785

786
// view can be used to perform reads on the given bucket.
787
//
788
// NOTE: this is part of the missionControlDB interface.
789
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
790
        reset func()) error {
47✔
791

47✔
792
        return n.db.View(func(tx kvdb.RTx) error {
94✔
793
                mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
47✔
794
                if mcStoreBkt == nil {
47✔
795
                        return fmt.Errorf("top level mission control bucket " +
×
796
                                "not found")
×
797
                }
×
798

799
                namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
47✔
800
                if namespacedBkt == nil {
47✔
801
                        return fmt.Errorf("namespaced bucket (%s) not found "+
×
802
                                "in mission control store", n.namespace)
×
803
                }
×
804

805
                return f(namespacedBkt)
47✔
806
        }, reset)
807
}
808

809
// purge will delete all the contents in the namespace.
810
//
811
// NOTE: this is part of the missionControlDB interface.
812
func (n *namespacedDB) purge() error {
3✔
813
        return n.db.Update(func(tx kvdb.RwTx) error {
6✔
814
                mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
3✔
815
                if mcStoreBkt == nil {
3✔
816
                        return nil
×
817
                }
×
818

819
                err := mcStoreBkt.DeleteNestedBucket(n.namespace)
3✔
820
                if err != nil {
3✔
821
                        return err
×
822
                }
×
823

824
                _, err = mcStoreBkt.CreateBucket(n.namespace)
3✔
825

3✔
826
                return err
3✔
827
        }, func() {})
3✔
828
}
829

830
// paymentFailure represents the presence of a payment failure. It may or may
831
// not include additional information about said failure.
832
type paymentFailure struct {
833
        info tlv.OptionalRecordT[tlv.TlvType0, paymentFailureInfo]
834
}
835

836
// newPaymentFailure constructs a new paymentFailure struct. If the source
837
// index is nil, then an empty paymentFailure is returned. This represents a
838
// failure with unknown details. Otherwise, the index and failure message are
839
// used to populate the info field of the paymentFailure.
840
func newPaymentFailure(sourceIdx *int,
841
        failureMsg lnwire.FailureMessage) *paymentFailure {
77✔
842

77✔
843
        if sourceIdx == nil {
78✔
844
                return &paymentFailure{}
1✔
845
        }
1✔
846

847
        info := paymentFailureInfo{
76✔
848
                sourceIdx: tlv.NewPrimitiveRecord[tlv.TlvType0](
76✔
849
                        uint8(*sourceIdx),
76✔
850
                ),
76✔
851
                msg: tlv.NewRecordT[tlv.TlvType1](failureMessage{failureMsg}),
76✔
852
        }
76✔
853

76✔
854
        return &paymentFailure{
76✔
855
                info: tlv.SomeRecordT(tlv.NewRecordT[tlv.TlvType0](info)),
76✔
856
        }
76✔
857
}
858

859
// Record returns a TLV record that can be used to encode/decode a
860
// paymentFailure to/from a TLV stream.
861
func (r *paymentFailure) Record() tlv.Record {
34✔
862
        recordSize := func() uint64 {
45✔
863
                var (
11✔
864
                        b   bytes.Buffer
11✔
865
                        buf [8]byte
11✔
866
                )
11✔
867
                if err := encodePaymentFailure(&b, r, &buf); err != nil {
11✔
868
                        panic(err)
×
869
                }
870

871
                return uint64(len(b.Bytes()))
11✔
872
        }
873

874
        return tlv.MakeDynamicRecord(
34✔
875
                0, r, recordSize, encodePaymentFailure, decodePaymentFailure,
34✔
876
        )
34✔
877
}
878

879
func encodePaymentFailure(w io.Writer, val interface{}, _ *[8]byte) error {
22✔
880
        if v, ok := val.(*paymentFailure); ok {
44✔
881
                var recordProducers []tlv.RecordProducer
22✔
882
                v.info.WhenSome(
22✔
883
                        func(r tlv.RecordT[tlv.TlvType0, paymentFailureInfo]) {
44✔
884
                                recordProducers = append(recordProducers, &r)
22✔
885
                        },
22✔
886
                )
887

888
                return lnwire.EncodeRecordsTo(
22✔
889
                        w, lnwire.ProduceRecordsSorted(recordProducers...),
22✔
890
                )
22✔
891
        }
892

893
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailure")
×
894
}
895

896
func decodePaymentFailure(r io.Reader, val interface{}, _ *[8]byte,
897
        l uint64) error {
21✔
898

21✔
899
        if v, ok := val.(*paymentFailure); ok {
42✔
900
                var h paymentFailure
21✔
901

21✔
902
                info := tlv.ZeroRecordT[tlv.TlvType0, paymentFailureInfo]()
21✔
903
                typeMap, err := lnwire.DecodeRecords(
21✔
904
                        r, lnwire.ProduceRecordsSorted(&info)...,
21✔
905
                )
21✔
906
                if err != nil {
21✔
907
                        return err
×
908
                }
×
909

910
                if _, ok := typeMap[h.info.TlvType()]; ok {
42✔
911
                        h.info = tlv.SomeRecordT(info)
21✔
912
                }
21✔
913

914
                *v = h
21✔
915

21✔
916
                return nil
21✔
917
        }
918

919
        return tlv.NewTypeForDecodingErr(val, "routing.paymentFailure", l, l)
×
920
}
921

922
// paymentFailureInfo holds additional information about a payment failure.
923
type paymentFailureInfo struct {
924
        sourceIdx tlv.RecordT[tlv.TlvType0, uint8]
925
        msg       tlv.RecordT[tlv.TlvType1, failureMessage]
926
}
927

928
// Record returns a TLV record that can be used to encode/decode a
929
// paymentFailureInfo to/from a TLV stream.
930
func (r *paymentFailureInfo) Record() tlv.Record {
43✔
931
        recordSize := func() uint64 {
65✔
932
                var (
22✔
933
                        b   bytes.Buffer
22✔
934
                        buf [8]byte
22✔
935
                )
22✔
936
                if err := encodePaymentFailureInfo(&b, r, &buf); err != nil {
22✔
937
                        panic(err)
×
938
                }
939

940
                return uint64(len(b.Bytes()))
22✔
941
        }
942

943
        return tlv.MakeDynamicRecord(
43✔
944
                0, r, recordSize, encodePaymentFailureInfo,
43✔
945
                decodePaymentFailureInfo,
43✔
946
        )
43✔
947
}
948

949
func encodePaymentFailureInfo(w io.Writer, val interface{}, _ *[8]byte) error {
44✔
950
        if v, ok := val.(*paymentFailureInfo); ok {
88✔
951
                return lnwire.EncodeRecordsTo(
44✔
952
                        w, lnwire.ProduceRecordsSorted(
44✔
953
                                &v.sourceIdx, &v.msg,
44✔
954
                        ),
44✔
955
                )
44✔
956
        }
44✔
957

958
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailureInfo")
×
959
}
960

961
func decodePaymentFailureInfo(r io.Reader, val interface{}, _ *[8]byte,
962
        l uint64) error {
21✔
963

21✔
964
        if v, ok := val.(*paymentFailureInfo); ok {
42✔
965
                var h paymentFailureInfo
21✔
966

21✔
967
                _, err := lnwire.DecodeRecords(
21✔
968
                        r,
21✔
969
                        lnwire.ProduceRecordsSorted(&h.sourceIdx, &h.msg)...,
21✔
970
                )
21✔
971
                if err != nil {
21✔
972
                        return err
×
973
                }
×
974

975
                *v = h
21✔
976

21✔
977
                return nil
21✔
978
        }
979

980
        return tlv.NewTypeForDecodingErr(
×
981
                val, "routing.paymentFailureInfo", l, l,
×
982
        )
×
983
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc