• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11880886288

17 Nov 2024 05:35PM UTC coverage: 57.94% (-1.0%) from 58.935%
11880886288

Pull #9274

github

ziggie1984
docs: add release-notes
Pull Request #9274: Decrease outgoing htlc budget

3 of 4 new or added lines in 1 file covered. (75.0%)

19093 existing lines in 233 files now uncovered.

100208 of 172952 relevant lines covered (57.94%)

25486.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.96
/routing/missioncontrol.go
1
package routing
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/build"
15
        "github.com/lightningnetwork/lnd/channeldb"
16
        "github.com/lightningnetwork/lnd/clock"
17
        "github.com/lightningnetwork/lnd/fn"
18
        "github.com/lightningnetwork/lnd/kvdb"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/routing/route"
21
        "github.com/lightningnetwork/lnd/tlv"
22
)
23

24
const (
25
        // DefaultPenaltyHalfLife is the default half-life duration. The
26
        // half-life duration defines after how much time a penalized node or
27
        // channel is back at 50% probability.
28
        DefaultPenaltyHalfLife = time.Hour
29

30
        // minSecondChanceInterval is the minimum time required between
31
        // second-chance failures.
32
        //
33
        // If nodes return a channel policy related failure, they may get a
34
        // second chance to forward the payment. It could be that the channel
35
        // policy that we are aware of is not up to date. This is especially
36
        // important in case of mobile apps that are mostly offline.
37
        //
38
        // However, we don't want to give nodes the option to endlessly return
39
        // new channel updates so that we are kept busy trying to route through
40
        // that node until the payment loop times out.
41
        //
42
        // Therefore we only grant a second chance to a node if the previous
43
        // second chance is sufficiently long ago. This is what
44
        // minSecondChanceInterval defines. If a second policy failure comes in
45
        // within that interval, we will apply a penalty.
46
        //
47
        // Second chances granted are tracked on the level of node pairs. This
48
        // means that if a node has multiple channels to the same peer, they
49
        // will only get a single second chance to route to that peer again.
50
        // Nodes forward non-strict, so it isn't necessary to apply a less
51
        // restrictive channel level tracking scheme here.
52
        minSecondChanceInterval = time.Minute
53

54
        // DefaultMaxMcHistory is the default maximum history size.
55
        DefaultMaxMcHistory = 1000
56

57
        // DefaultMcFlushInterval is the default interval we use to flush MC state
58
        // to the database.
59
        DefaultMcFlushInterval = time.Second
60

61
        // prevSuccessProbability is the assumed probability for node pairs that
62
        // successfully relayed the previous attempt.
63
        prevSuccessProbability = 0.95
64

65
        // DefaultAprioriWeight is the default a priori weight. See
66
        // MissionControlConfig for further explanation.
67
        DefaultAprioriWeight = 0.5
68

69
        // DefaultMinFailureRelaxInterval is the default minimum time that must
70
        // have passed since the previously recorded failure before the failure
71
        // amount may be raised.
72
        DefaultMinFailureRelaxInterval = time.Minute
73

74
        // DefaultFeeEstimationTimeout is the default value for
75
        // FeeEstimationTimeout. It defines the maximum duration that the
76
        // probing fee estimation is allowed to take.
77
        DefaultFeeEstimationTimeout = time.Minute
78

79
        // DefaultMissionControlNamespace is the name of the default mission
80
        // control name space. This is used as the sub-bucket key within the
81
        // top level DB bucket to store mission control results.
82
        DefaultMissionControlNamespace = "default"
83
)
84

85
var (
86
        // ErrInvalidMcHistory is returned if we get a negative mission control
87
        // history count.
88
        ErrInvalidMcHistory = errors.New("mission control history must be " +
89
                ">= 0")
90

91
        // ErrInvalidFailureInterval is returned if we get an invalid failure
92
        // interval.
93
        ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
94
)
95

96
// NodeResults contains previous results from a node to its peers.
97
type NodeResults map[route.Vertex]TimedPairResult
98

99
// mcConfig holds various config members that will be required by all
100
// MissionControl instances and will be the same regardless of namespace.
101
type mcConfig struct {
102
        // clock is a time source used by mission control.
103
        clock clock.Clock
104

105
        // selfNode is our pubkey.
106
        selfNode route.Vertex
107
}
108

109
// MissionControl contains state which summarizes the past attempts of HTLC
110
// routing by external callers when sending payments throughout the network. It
111
// acts as a shared memory during routing attempts with the goal to optimize the
112
// payment attempt success rate.
113
//
114
// Failed payment attempts are reported to mission control. These reports are
115
// used to track the time of the last node or channel level failure. The time
116
// since the last failure is used to estimate a success probability that is fed
117
// into the path finding process for subsequent payment attempts.
118
type MissionControl struct {
119
        cfg *mcConfig
120

121
        // state is the internal mission control state that is input for
122
        // probability estimation.
123
        state *missionControlState
124

125
        store *missionControlStore
126

127
        // estimator is the probability estimator that is used with the payment
128
        // results that mission control collects.
129
        estimator Estimator
130

131
        // onConfigUpdate is a function that is called whenever the
132
        // mission control state is updated.
133
        onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
134

135
        log btclog.Logger
136

137
        mu sync.Mutex
138
}
139

140
// MissionController manages MissionControl instances in various namespaces.
141
type MissionController struct {
142
        db           kvdb.Backend
143
        cfg          *mcConfig
144
        defaultMCCfg *MissionControlConfig
145

146
        mc map[string]*MissionControl
147
        mu sync.Mutex
148

149
        // TODO(roasbeef): further counters, if vertex continually unavailable,
150
        // add to another generation
151

152
        // TODO(roasbeef): also add favorable metrics for nodes
153
}
154

155
// GetNamespacedStore returns the MissionControl in the given namespace. If one
156
// does not yet exist, then it is initialised.
157
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
158
        error) {
34✔
159

34✔
160
        m.mu.Lock()
34✔
161
        defer m.mu.Unlock()
34✔
162

34✔
163
        if mc, ok := m.mc[ns]; ok {
67✔
164
                return mc, nil
33✔
165
        }
33✔
166

167
        return m.initMissionControl(ns)
1✔
168
}
169

170
// ListNamespaces returns a list of the namespaces that the MissionController
171
// is aware of.
172
func (m *MissionController) ListNamespaces() []string {
3✔
173
        m.mu.Lock()
3✔
174
        defer m.mu.Unlock()
3✔
175

3✔
176
        namespaces := make([]string, 0, len(m.mc))
3✔
177
        for ns := range m.mc {
8✔
178
                namespaces = append(namespaces, ns)
5✔
179
        }
5✔
180

181
        return namespaces
3✔
182
}
183

184
// MissionControlConfig defines parameters that control mission control
185
// behaviour.
186
type MissionControlConfig struct {
187
        // Estimator gives probability estimates for node pairs.
188
        Estimator Estimator
189

190
        // OnConfigUpdate is function that is called whenever the
191
        // mission control state is updated.
192
        OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
193

194
        // MaxMcHistory defines the maximum number of payment results that are
195
        // held on disk.
196
        MaxMcHistory int
197

198
        // McFlushInterval defines the ticker interval when we flush the
199
        // accumulated state to the DB.
200
        McFlushInterval time.Duration
201

202
        // MinFailureRelaxInterval is the minimum time that must have passed
203
        // since the previously recorded failure before the failure amount may
204
        // be raised.
205
        MinFailureRelaxInterval time.Duration
206
}
207

208
func (c *MissionControlConfig) validate() error {
31✔
209
        if c.MaxMcHistory < 0 {
31✔
210
                return ErrInvalidMcHistory
×
211
        }
×
212

213
        if c.MinFailureRelaxInterval < 0 {
31✔
214
                return ErrInvalidFailureInterval
×
215
        }
×
216

217
        return nil
31✔
218
}
219

220
// String returns a string representation of a mission control config.
221
func (c *MissionControlConfig) String() string {
31✔
222
        return fmt.Sprintf("maximum history: %v, minimum failure relax "+
31✔
223
                "interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
31✔
224
}
31✔
225

226
// TimedPairResult describes a timestamped pair result.
227
type TimedPairResult struct {
228
        // FailTime is the time of the last failure.
229
        FailTime time.Time
230

231
        // FailAmt is the amount of the last failure. This amount may be pushed
232
        // up if a later success is higher than the last failed amount.
233
        FailAmt lnwire.MilliSatoshi
234

235
        // SuccessTime is the time of the last success.
236
        SuccessTime time.Time
237

238
        // SuccessAmt is the highest amount that successfully forwarded. This
239
        // isn't necessarily the last success amount. The value of this field
240
        // may also be pushed down if a later failure is lower than the highest
241
        // success amount. Because of this, SuccessAmt may not match
242
        // SuccessTime.
243
        SuccessAmt lnwire.MilliSatoshi
244
}
245

246
// MissionControlSnapshot contains a snapshot of the current state of mission
247
// control.
248
type MissionControlSnapshot struct {
249
        // Pairs is a list of channels for which specific information is
250
        // logged.
251
        Pairs []MissionControlPairSnapshot
252
}
253

254
// MissionControlPairSnapshot contains a snapshot of the current node pair
255
// state in mission control.
256
type MissionControlPairSnapshot struct {
257
        // Pair is the node pair of which the state is described.
258
        Pair DirectedNodePair
259

260
        // TimedPairResult contains the data for this pair.
261
        TimedPairResult
262
}
263

264
// paymentResult is the information that becomes available when a payment
265
// attempt completes.
266
type paymentResult struct {
267
        id        uint64
268
        timeFwd   tlv.RecordT[tlv.TlvType0, uint64]
269
        timeReply tlv.RecordT[tlv.TlvType1, uint64]
270
        route     tlv.RecordT[tlv.TlvType2, mcRoute]
271

272
        // failure holds information related to the failure of a payment. The
273
        // presence of this record indicates a payment failure. The absence of
274
        // this record indicates a successful payment.
275
        failure tlv.OptionalRecordT[tlv.TlvType3, paymentFailure]
276
}
277

278
// newPaymentResult constructs a new paymentResult.
279
func newPaymentResult(id uint64, rt *mcRoute, timeFwd, timeReply time.Time,
280
        failure *paymentFailure) *paymentResult {
76✔
281

76✔
282
        result := &paymentResult{
76✔
283
                id: id,
76✔
284
                timeFwd: tlv.NewPrimitiveRecord[tlv.TlvType0](
76✔
285
                        uint64(timeFwd.UnixNano()),
76✔
286
                ),
76✔
287
                timeReply: tlv.NewPrimitiveRecord[tlv.TlvType1](
76✔
288
                        uint64(timeReply.UnixNano()),
76✔
289
                ),
76✔
290
                route: tlv.NewRecordT[tlv.TlvType2](*rt),
76✔
291
        }
76✔
292

76✔
293
        if failure != nil {
129✔
294
                result.failure = tlv.SomeRecordT(
53✔
295
                        tlv.NewRecordT[tlv.TlvType3](*failure),
53✔
296
                )
53✔
297
        }
53✔
298

299
        return result
76✔
300
}
301

302
// NewMissionController returns a new instance of MissionController.
303
func NewMissionController(db kvdb.Backend, self route.Vertex,
304
        cfg *MissionControlConfig) (*MissionController, error) {
31✔
305

31✔
306
        log.Debugf("Instantiating mission control with config: %v, %v", cfg,
31✔
307
                cfg.Estimator)
31✔
308

31✔
309
        if err := cfg.validate(); err != nil {
31✔
310
                return nil, err
×
311
        }
×
312

313
        mcCfg := &mcConfig{
31✔
314
                clock:    clock.NewDefaultClock(),
31✔
315
                selfNode: self,
31✔
316
        }
31✔
317

31✔
318
        mgr := &MissionController{
31✔
319
                db:           db,
31✔
320
                defaultMCCfg: cfg,
31✔
321
                cfg:          mcCfg,
31✔
322
                mc:           make(map[string]*MissionControl),
31✔
323
        }
31✔
324

31✔
325
        if err := mgr.loadMissionControls(); err != nil {
31✔
326
                return nil, err
×
327
        }
×
328

329
        for _, mc := range mgr.mc {
63✔
330
                if err := mc.init(); err != nil {
32✔
331
                        return nil, err
×
332
                }
×
333
        }
334

335
        return mgr, nil
31✔
336
}
337

338
// loadMissionControls initialises a MissionControl in the default namespace if
339
// one does not yet exist. It then initialises a MissionControl for all other
340
// namespaces found in the DB.
341
//
342
// NOTE: this should only be called once during MissionController construction.
343
func (m *MissionController) loadMissionControls() error {
31✔
344
        m.mu.Lock()
31✔
345
        defer m.mu.Unlock()
31✔
346

31✔
347
        // Always initialise the default namespace.
31✔
348
        _, err := m.initMissionControl(DefaultMissionControlNamespace)
31✔
349
        if err != nil {
31✔
350
                return err
×
351
        }
×
352

353
        namespaces := make(map[string]struct{})
31✔
354
        err = m.db.View(func(tx walletdb.ReadTx) error {
62✔
355
                mcStoreBkt := tx.ReadBucket(resultsKey)
31✔
356
                if mcStoreBkt == nil {
31✔
357
                        return fmt.Errorf("top level mission control bucket " +
×
358
                                "not found")
×
359
                }
×
360

361
                // Iterate through all the keys in the bucket and collect the
362
                // namespaces.
363
                return mcStoreBkt.ForEach(func(k, _ []byte) error {
63✔
364
                        // We've already initialised the default namespace so
32✔
365
                        // we can skip it.
32✔
366
                        if string(k) == DefaultMissionControlNamespace {
63✔
367
                                return nil
31✔
368
                        }
31✔
369

370
                        namespaces[string(k)] = struct{}{}
1✔
371

1✔
372
                        return nil
1✔
373
                })
374
        }, func() {})
31✔
375
        if err != nil {
31✔
376
                return err
×
377
        }
×
378

379
        // Now, iterate through all the namespaces and initialise them.
380
        for ns := range namespaces {
32✔
381
                _, err = m.initMissionControl(ns)
1✔
382
                if err != nil {
1✔
383
                        return err
×
384
                }
×
385
        }
386

387
        return nil
31✔
388
}
389

390
// initMissionControl creates a new MissionControl instance with the given
391
// namespace if one does not yet exist.
392
//
393
// NOTE: the MissionController's mutex must be held before calling this method.
394
func (m *MissionController) initMissionControl(namespace string) (
395
        *MissionControl, error) {
33✔
396

33✔
397
        // If a mission control with this namespace has already been initialised
33✔
398
        // then there is nothing left to do.
33✔
399
        if mc, ok := m.mc[namespace]; ok {
33✔
400
                return mc, nil
×
401
        }
×
402

403
        cfg := m.defaultMCCfg
33✔
404

33✔
405
        store, err := newMissionControlStore(
33✔
406
                newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
33✔
407
                cfg.McFlushInterval,
33✔
408
        )
33✔
409
        if err != nil {
33✔
410
                return nil, err
×
411
        }
×
412

413
        mc := &MissionControl{
33✔
414
                cfg:       m.cfg,
33✔
415
                state:     newMissionControlState(cfg.MinFailureRelaxInterval),
33✔
416
                store:     store,
33✔
417
                estimator: cfg.Estimator,
33✔
418
                log: build.NewPrefixLog(
33✔
419
                        fmt.Sprintf("[%s]:", namespace), log,
33✔
420
                ),
33✔
421
                onConfigUpdate: cfg.OnConfigUpdate,
33✔
422
        }
33✔
423

33✔
424
        m.mc[namespace] = mc
33✔
425

33✔
426
        return mc, nil
33✔
427
}
428

429
// RunStoreTickers runs the mission controller store's tickers.
UNCOV
430
func (m *MissionController) RunStoreTickers() {
×
UNCOV
431
        m.mu.Lock()
×
UNCOV
432
        defer m.mu.Unlock()
×
UNCOV
433

×
UNCOV
434
        for _, mc := range m.mc {
×
UNCOV
435
                mc.store.run()
×
UNCOV
436
        }
×
437
}
438

439
// StopStoreTickers stops the mission control store's tickers.
UNCOV
440
func (m *MissionController) StopStoreTickers() {
×
UNCOV
441
        log.Debug("Stopping mission control store ticker")
×
UNCOV
442
        defer log.Debug("Mission control store ticker stopped")
×
UNCOV
443

×
UNCOV
444
        m.mu.Lock()
×
UNCOV
445
        defer m.mu.Unlock()
×
UNCOV
446

×
UNCOV
447
        for _, mc := range m.mc {
×
UNCOV
448
                mc.store.stop()
×
UNCOV
449
        }
×
450
}
451

452
// init initializes mission control with historical data.
453
func (m *MissionControl) init() error {
32✔
454
        m.log.Debugf("Mission control state reconstruction started")
32✔
455

32✔
456
        m.mu.Lock()
32✔
457
        defer m.mu.Unlock()
32✔
458

32✔
459
        start := time.Now()
32✔
460

32✔
461
        results, err := m.store.fetchAll()
32✔
462
        if err != nil {
32✔
463
                return err
×
464
        }
×
465

466
        for _, result := range results {
36✔
467
                _ = m.applyPaymentResult(result)
4✔
468
        }
4✔
469

470
        m.log.Debugf("Mission control state reconstruction finished: "+
32✔
471
                "n=%v, time=%v", len(results), time.Since(start))
32✔
472

32✔
473
        return nil
32✔
474
}
475

476
// GetConfig returns the config that mission control is currently configured
477
// with. All fields are copied by value, so we do not need to worry about
478
// mutation.
UNCOV
479
func (m *MissionControl) GetConfig() *MissionControlConfig {
×
UNCOV
480
        m.mu.Lock()
×
UNCOV
481
        defer m.mu.Unlock()
×
UNCOV
482

×
UNCOV
483
        return &MissionControlConfig{
×
UNCOV
484
                Estimator:               m.estimator,
×
UNCOV
485
                MaxMcHistory:            m.store.maxRecords,
×
UNCOV
486
                McFlushInterval:         m.store.flushInterval,
×
UNCOV
487
                MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
×
UNCOV
488
        }
×
UNCOV
489
}
×
490

491
// SetConfig validates the config provided and updates mission control's config
492
// if it is valid.
UNCOV
493
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
×
UNCOV
494
        if cfg == nil {
×
495
                return errors.New("nil mission control config")
×
496
        }
×
497

UNCOV
498
        if err := cfg.validate(); err != nil {
×
499
                return err
×
500
        }
×
501

UNCOV
502
        m.mu.Lock()
×
UNCOV
503
        defer m.mu.Unlock()
×
UNCOV
504

×
UNCOV
505
        m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
×
UNCOV
506
                cfg.Estimator)
×
UNCOV
507

×
UNCOV
508
        m.store.maxRecords = cfg.MaxMcHistory
×
UNCOV
509
        m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
×
UNCOV
510
        m.estimator = cfg.Estimator
×
UNCOV
511

×
UNCOV
512
        // Execute the callback function if it is set.
×
UNCOV
513
        m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
×
UNCOV
514
                f(cfg)
×
UNCOV
515
        })
×
516

UNCOV
517
        return nil
×
518
}
519

520
// ResetHistory resets the history of MissionControl returning it to a state as
521
// if no payment attempts have been made.
522
func (m *MissionControl) ResetHistory() error {
3✔
523
        m.mu.Lock()
3✔
524
        defer m.mu.Unlock()
3✔
525

3✔
526
        if err := m.store.clear(); err != nil {
3✔
527
                return err
×
528
        }
×
529

530
        m.state.resetHistory()
3✔
531

3✔
532
        m.log.Debugf("Mission control history cleared")
3✔
533

3✔
534
        return nil
3✔
535
}
536

537
// GetProbability is expected to return the success probability of a payment
538
// from fromNode along edge.
539
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
540
        amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
662✔
541

662✔
542
        m.mu.Lock()
662✔
543
        defer m.mu.Unlock()
662✔
544

662✔
545
        now := m.cfg.clock.Now()
662✔
546
        results, _ := m.state.getLastPairResult(fromNode)
662✔
547

662✔
548
        // Use a distinct probability estimation function for local channels.
662✔
549
        if fromNode == m.cfg.selfNode {
701✔
550
                return m.estimator.LocalPairProbability(now, results, toNode)
39✔
551
        }
39✔
552

553
        return m.estimator.PairProbability(
623✔
554
                now, results, toNode, amt, capacity,
623✔
555
        )
623✔
556
}
557

558
// GetHistorySnapshot takes a snapshot from the current mission control state
559
// and actual probability estimates.
560
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
1✔
561
        m.mu.Lock()
1✔
562
        defer m.mu.Unlock()
1✔
563

1✔
564
        m.log.Debugf("Requesting history snapshot from mission control")
1✔
565

1✔
566
        return m.state.getSnapshot()
1✔
567
}
1✔
568

569
// ImportHistory imports the set of mission control results provided to our
570
// in-memory state. These results are not persisted, so will not survive
571
// restarts.
572
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
UNCOV
573
        force bool) error {
×
UNCOV
574

×
UNCOV
575
        if history == nil {
×
576
                return errors.New("cannot import nil history")
×
577
        }
×
578

UNCOV
579
        m.mu.Lock()
×
UNCOV
580
        defer m.mu.Unlock()
×
UNCOV
581

×
UNCOV
582
        m.log.Infof("Importing history snapshot with %v pairs to mission "+
×
UNCOV
583
                "control", len(history.Pairs))
×
UNCOV
584

×
UNCOV
585
        imported := m.state.importSnapshot(history, force)
×
UNCOV
586

×
UNCOV
587
        m.log.Infof("Imported %v results to mission control", imported)
×
UNCOV
588

×
UNCOV
589
        return nil
×
590
}
591

592
// GetPairHistorySnapshot returns the stored history for a given node pair.
593
func (m *MissionControl) GetPairHistorySnapshot(
UNCOV
594
        fromNode, toNode route.Vertex) TimedPairResult {
×
UNCOV
595

×
UNCOV
596
        m.mu.Lock()
×
UNCOV
597
        defer m.mu.Unlock()
×
UNCOV
598

×
UNCOV
599
        results, ok := m.state.getLastPairResult(fromNode)
×
UNCOV
600
        if !ok {
×
UNCOV
601
                return TimedPairResult{}
×
UNCOV
602
        }
×
603

UNCOV
604
        result, ok := results[toNode]
×
UNCOV
605
        if !ok {
×
UNCOV
606
                return TimedPairResult{}
×
UNCOV
607
        }
×
608

UNCOV
609
        return result
×
610
}
611

612
// ReportPaymentFail reports a failed payment to mission control as input for
613
// future probability estimates. The failureSourceIdx argument indicates the
614
// failure source. If it is nil, the failure source is unknown. This function
615
// returns a reason if this failure is a final failure. In that case no further
616
// payment attempts need to be made.
617
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
618
        failureSourceIdx *int, failure lnwire.FailureMessage) (
619
        *channeldb.FailureReason, error) {
46✔
620

46✔
621
        timestamp := m.cfg.clock.Now()
46✔
622

46✔
623
        result := newPaymentResult(
46✔
624
                paymentID, extractMCRoute(rt), timestamp, timestamp,
46✔
625
                newPaymentFailure(failureSourceIdx, failure),
46✔
626
        )
46✔
627

46✔
628
        return m.processPaymentResult(result)
46✔
629
}
46✔
630

631
// ReportPaymentSuccess reports a successful payment to mission control as input
632
// for future probability estimates.
633
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
634
        rt *route.Route) error {
22✔
635

22✔
636
        timestamp := m.cfg.clock.Now()
22✔
637

22✔
638
        result := newPaymentResult(
22✔
639
                paymentID, extractMCRoute(rt), timestamp, timestamp, nil,
22✔
640
        )
22✔
641

22✔
642
        _, err := m.processPaymentResult(result)
22✔
643

22✔
644
        return err
22✔
645
}
22✔
646

647
// processPaymentResult stores a payment result in the mission control store and
648
// updates mission control's in-memory state.
649
func (m *MissionControl) processPaymentResult(result *paymentResult) (
650
        *channeldb.FailureReason, error) {
68✔
651

68✔
652
        // Store complete result in database.
68✔
653
        m.store.AddResult(result)
68✔
654

68✔
655
        m.mu.Lock()
68✔
656
        defer m.mu.Unlock()
68✔
657

68✔
658
        // Apply result to update mission control state.
68✔
659
        reason := m.applyPaymentResult(result)
68✔
660

68✔
661
        return reason, nil
68✔
662
}
68✔
663

664
// applyPaymentResult applies a payment result as input for future probability
665
// estimates. It returns a bool indicating whether this error is a final error
666
// and no further payment attempts need to be made.
667
func (m *MissionControl) applyPaymentResult(
668
        result *paymentResult) *channeldb.FailureReason {
72✔
669

72✔
670
        // Interpret result.
72✔
671
        i := interpretResult(&result.route.Val, result.failure.ValOpt())
72✔
672

72✔
673
        if i.policyFailure != nil {
81✔
674
                if m.state.requestSecondChance(
9✔
675
                        time.Unix(0, int64(result.timeReply.Val)),
9✔
676
                        i.policyFailure.From, i.policyFailure.To,
9✔
677
                ) {
15✔
678
                        return nil
6✔
679
                }
6✔
680
        }
681

682
        // If there is a node-level failure, record a failure for every tried
683
        // connection of that node. A node-level failure can be considered as a
684
        // failure that would have occurred with any of the node's channels.
685
        //
686
        // Ideally we'd also record the failure for the untried connections of
687
        // the node. Unfortunately this would require access to the graph and
688
        // adding this dependency and db calls does not outweigh the benefits.
689
        //
690
        // Untried connections will fall back to the node probability. After the
691
        // call to setAllPairResult below, the node probability will be equal to
692
        // the probability of the tried channels except that the a priori
693
        // probability is mixed in too. This effect is controlled by the
694
        // aprioriWeight parameter. If that parameter isn't set to an extreme
695
        // and there are a few known connections, there shouldn't be much of a
696
        // difference. The largest difference occurs when aprioriWeight is 1. In
697
        // that case, a node-level failure would not be applied to untried
698
        // channels.
699
        if i.nodeFailure != nil {
72✔
700
                m.log.Debugf("Reporting node failure to Mission Control: "+
6✔
701
                        "node=%v", *i.nodeFailure)
6✔
702

6✔
703
                m.state.setAllFail(
6✔
704
                        *i.nodeFailure,
6✔
705
                        time.Unix(0, int64(result.timeReply.Val)),
6✔
706
                )
6✔
707
        }
6✔
708

709
        for pair, pairResult := range i.pairResults {
212✔
710
                pairResult := pairResult
146✔
711

146✔
712
                if pairResult.success {
227✔
713
                        m.log.Debugf("Reporting pair success to Mission "+
81✔
714
                                "Control: pair=%v, amt=%v",
81✔
715
                                pair, pairResult.amt)
81✔
716
                } else {
146✔
717
                        m.log.Debugf("Reporting pair failure to Mission "+
65✔
718
                                "Control: pair=%v, amt=%v",
65✔
719
                                pair, pairResult.amt)
65✔
720
                }
65✔
721

722
                m.state.setLastPairResult(
146✔
723
                        pair.From, pair.To,
146✔
724
                        time.Unix(0, int64(result.timeReply.Val)), &pairResult,
146✔
725
                        false,
146✔
726
                )
146✔
727
        }
728

729
        return i.finalFailureReason
66✔
730
}
731

732
// namespacedDB is an implementation of the missionControlDB that gives a user
733
// of the interface access to a namespaced bucket within the top level mission
734
// control bucket.
735
type namespacedDB struct {
736
        topLevelBucketKey []byte
737
        namespace         []byte
738
        db                kvdb.Backend
739
}
740

741
// A compile-time check to ensure that namespacedDB implements missionControlDB.
742
var _ missionControlDB = (*namespacedDB)(nil)
743

744
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
745
// default namespace.
746
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
4✔
747
        return newNamespacedDB(db, DefaultMissionControlNamespace)
4✔
748
}
4✔
749

750
// newNamespacedDB creates a new instance of missionControlDB where the DB will
751
// have access to a namespaced bucket within the top level mission control
752
// bucket.
753
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
37✔
754
        return &namespacedDB{
37✔
755
                db:                db,
37✔
756
                namespace:         []byte(namespace),
37✔
757
                topLevelBucketKey: resultsKey,
37✔
758
        }
37✔
759
}
37✔
760

761
// update can be used to perform reads and writes on the given bucket.
762
//
763
// NOTE: this is part of the missionControlDB interface.
764
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
765
        reset func()) error {
46✔
766

46✔
767
        return n.db.Update(func(tx kvdb.RwTx) error {
92✔
768
                mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
46✔
769
                if err != nil {
46✔
770
                        return fmt.Errorf("cannot create top level mission "+
×
771
                                "control bucket: %w", err)
×
772
                }
×
773

774
                namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
46✔
775
                        n.namespace,
46✔
776
                )
46✔
777
                if err != nil {
46✔
778
                        return fmt.Errorf("cannot create namespaced bucket "+
×
779
                                "(%s) in mission control store: %w",
×
780
                                n.namespace, err)
×
781
                }
×
782

783
                return f(namespacedBkt)
46✔
784
        }, reset)
785
}
786

787
// view can be used to perform reads on the given bucket.
788
//
789
// NOTE: this is part of the missionControlDB interface.
790
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
791
        reset func()) error {
47✔
792

47✔
793
        return n.db.View(func(tx kvdb.RTx) error {
94✔
794
                mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
47✔
795
                if mcStoreBkt == nil {
47✔
796
                        return fmt.Errorf("top level mission control bucket " +
×
797
                                "not found")
×
798
                }
×
799

800
                namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
47✔
801
                if namespacedBkt == nil {
47✔
802
                        return fmt.Errorf("namespaced bucket (%s) not found "+
×
803
                                "in mission control store", n.namespace)
×
804
                }
×
805

806
                return f(namespacedBkt)
47✔
807
        }, reset)
808
}
809

810
// purge will delete all the contents in the namespace.
811
//
812
// NOTE: this is part of the missionControlDB interface.
813
func (n *namespacedDB) purge() error {
3✔
814
        return n.db.Update(func(tx kvdb.RwTx) error {
6✔
815
                mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
3✔
816
                if mcStoreBkt == nil {
3✔
817
                        return nil
×
818
                }
×
819

820
                err := mcStoreBkt.DeleteNestedBucket(n.namespace)
3✔
821
                if err != nil {
3✔
822
                        return err
×
823
                }
×
824

825
                _, err = mcStoreBkt.CreateBucket(n.namespace)
3✔
826

3✔
827
                return err
3✔
828
        }, func() {})
3✔
829
}
830

831
// paymentFailure represents the presence of a payment failure. It may or may
832
// not include additional information about said failure.
833
type paymentFailure struct {
834
        info tlv.OptionalRecordT[tlv.TlvType0, paymentFailureInfo]
835
}
836

837
// newPaymentFailure constructs a new paymentFailure struct. If the source
838
// index is nil, then an empty paymentFailure is returned. This represents a
839
// failure with unknown details. Otherwise, the index and failure message are
840
// used to populate the info field of the paymentFailure.
841
func newPaymentFailure(sourceIdx *int,
842
        failureMsg lnwire.FailureMessage) *paymentFailure {
77✔
843

77✔
844
        if sourceIdx == nil {
78✔
845
                return &paymentFailure{}
1✔
846
        }
1✔
847

848
        info := paymentFailureInfo{
76✔
849
                sourceIdx: tlv.NewPrimitiveRecord[tlv.TlvType0](
76✔
850
                        uint8(*sourceIdx),
76✔
851
                ),
76✔
852
                msg: tlv.NewRecordT[tlv.TlvType1](failureMessage{failureMsg}),
76✔
853
        }
76✔
854

76✔
855
        return &paymentFailure{
76✔
856
                info: tlv.SomeRecordT(tlv.NewRecordT[tlv.TlvType0](info)),
76✔
857
        }
76✔
858
}
859

860
// Record returns a TLV record that can be used to encode/decode a
861
// paymentFailure to/from a TLV stream.
862
func (r *paymentFailure) Record() tlv.Record {
34✔
863
        recordSize := func() uint64 {
45✔
864
                var (
11✔
865
                        b   bytes.Buffer
11✔
866
                        buf [8]byte
11✔
867
                )
11✔
868
                if err := encodePaymentFailure(&b, r, &buf); err != nil {
11✔
869
                        panic(err)
×
870
                }
871

872
                return uint64(len(b.Bytes()))
11✔
873
        }
874

875
        return tlv.MakeDynamicRecord(
34✔
876
                0, r, recordSize, encodePaymentFailure, decodePaymentFailure,
34✔
877
        )
34✔
878
}
879

880
func encodePaymentFailure(w io.Writer, val interface{}, _ *[8]byte) error {
22✔
881
        if v, ok := val.(*paymentFailure); ok {
44✔
882
                var recordProducers []tlv.RecordProducer
22✔
883
                v.info.WhenSome(
22✔
884
                        func(r tlv.RecordT[tlv.TlvType0, paymentFailureInfo]) {
44✔
885
                                recordProducers = append(recordProducers, &r)
22✔
886
                        },
22✔
887
                )
888

889
                return lnwire.EncodeRecordsTo(
22✔
890
                        w, lnwire.ProduceRecordsSorted(recordProducers...),
22✔
891
                )
22✔
892
        }
893

894
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailure")
×
895
}
896

897
func decodePaymentFailure(r io.Reader, val interface{}, _ *[8]byte,
898
        l uint64) error {
21✔
899

21✔
900
        if v, ok := val.(*paymentFailure); ok {
42✔
901
                var h paymentFailure
21✔
902

21✔
903
                info := tlv.ZeroRecordT[tlv.TlvType0, paymentFailureInfo]()
21✔
904
                typeMap, err := lnwire.DecodeRecords(
21✔
905
                        r, lnwire.ProduceRecordsSorted(&info)...,
21✔
906
                )
21✔
907
                if err != nil {
21✔
908
                        return err
×
909
                }
×
910

911
                if _, ok := typeMap[h.info.TlvType()]; ok {
42✔
912
                        h.info = tlv.SomeRecordT(info)
21✔
913
                }
21✔
914

915
                *v = h
21✔
916

21✔
917
                return nil
21✔
918
        }
919

920
        return tlv.NewTypeForDecodingErr(val, "routing.paymentFailure", l, l)
×
921
}
922

923
// paymentFailureInfo holds additional information about a payment failure.
924
type paymentFailureInfo struct {
925
        sourceIdx tlv.RecordT[tlv.TlvType0, uint8]
926
        msg       tlv.RecordT[tlv.TlvType1, failureMessage]
927
}
928

929
// Record returns a TLV record that can be used to encode/decode a
930
// paymentFailureInfo to/from a TLV stream.
931
func (r *paymentFailureInfo) Record() tlv.Record {
43✔
932
        recordSize := func() uint64 {
65✔
933
                var (
22✔
934
                        b   bytes.Buffer
22✔
935
                        buf [8]byte
22✔
936
                )
22✔
937
                if err := encodePaymentFailureInfo(&b, r, &buf); err != nil {
22✔
938
                        panic(err)
×
939
                }
940

941
                return uint64(len(b.Bytes()))
22✔
942
        }
943

944
        return tlv.MakeDynamicRecord(
43✔
945
                0, r, recordSize, encodePaymentFailureInfo,
43✔
946
                decodePaymentFailureInfo,
43✔
947
        )
43✔
948
}
949

950
func encodePaymentFailureInfo(w io.Writer, val interface{}, _ *[8]byte) error {
44✔
951
        if v, ok := val.(*paymentFailureInfo); ok {
88✔
952
                return lnwire.EncodeRecordsTo(
44✔
953
                        w, lnwire.ProduceRecordsSorted(
44✔
954
                                &v.sourceIdx, &v.msg,
44✔
955
                        ),
44✔
956
                )
44✔
957
        }
44✔
958

959
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailureInfo")
×
960
}
961

962
func decodePaymentFailureInfo(r io.Reader, val interface{}, _ *[8]byte,
963
        l uint64) error {
21✔
964

21✔
965
        if v, ok := val.(*paymentFailureInfo); ok {
42✔
966
                var h paymentFailureInfo
21✔
967

21✔
968
                _, err := lnwire.DecodeRecords(
21✔
969
                        r,
21✔
970
                        lnwire.ProduceRecordsSorted(&h.sourceIdx, &h.msg)...,
21✔
971
                )
21✔
972
                if err != nil {
21✔
973
                        return err
×
974
                }
×
975

976
                *v = h
21✔
977

21✔
978
                return nil
21✔
979
        }
980

981
        return tlv.NewTypeForDecodingErr(
×
982
                val, "routing.paymentFailureInfo", l, l,
×
983
        )
×
984
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc