• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12430538171

20 Dec 2024 11:21AM UTC coverage: 58.716% (+0.1%) from 58.576%
12430538171

push

github

web-flow
Merge pull request #9381 from yyforyongyu/fix-no-space-left

workflows: fix no space left error

135265 of 230373 relevant lines covered (58.72%)

19174.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.28
/routing/missioncontrol.go
1
package routing
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/channeldb"
15
        "github.com/lightningnetwork/lnd/clock"
16
        "github.com/lightningnetwork/lnd/fn/v2"
17
        "github.com/lightningnetwork/lnd/kvdb"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/routing/route"
20
        "github.com/lightningnetwork/lnd/tlv"
21
)
22

23
const (
24
        // DefaultPenaltyHalfLife is the default half-life duration. The
25
        // half-life duration defines after how much time a penalized node or
26
        // channel is back at 50% probability.
27
        DefaultPenaltyHalfLife = time.Hour
28

29
        // minSecondChanceInterval is the minimum time required between
30
        // second-chance failures.
31
        //
32
        // If nodes return a channel policy related failure, they may get a
33
        // second chance to forward the payment. It could be that the channel
34
        // policy that we are aware of is not up to date. This is especially
35
        // important in case of mobile apps that are mostly offline.
36
        //
37
        // However, we don't want to give nodes the option to endlessly return
38
        // new channel updates so that we are kept busy trying to route through
39
        // that node until the payment loop times out.
40
        //
41
        // Therefore we only grant a second chance to a node if the previous
42
        // second chance is sufficiently long ago. This is what
43
        // minSecondChanceInterval defines. If a second policy failure comes in
44
        // within that interval, we will apply a penalty.
45
        //
46
        // Second chances granted are tracked on the level of node pairs. This
47
        // means that if a node has multiple channels to the same peer, they
48
        // will only get a single second chance to route to that peer again.
49
        // Nodes forward non-strict, so it isn't necessary to apply a less
50
        // restrictive channel level tracking scheme here.
51
        minSecondChanceInterval = time.Minute
52

53
        // DefaultMaxMcHistory is the default maximum history size.
54
        DefaultMaxMcHistory = 1000
55

56
        // DefaultMcFlushInterval is the default interval we use to flush MC state
57
        // to the database.
58
        DefaultMcFlushInterval = time.Second
59

60
        // prevSuccessProbability is the assumed probability for node pairs that
61
        // successfully relayed the previous attempt.
62
        prevSuccessProbability = 0.95
63

64
        // DefaultAprioriWeight is the default a priori weight. See
65
        // MissionControlConfig for further explanation.
66
        DefaultAprioriWeight = 0.5
67

68
        // DefaultMinFailureRelaxInterval is the default minimum time that must
69
        // have passed since the previously recorded failure before the failure
70
        // amount may be raised.
71
        DefaultMinFailureRelaxInterval = time.Minute
72

73
        // DefaultFeeEstimationTimeout is the default value for
74
        // FeeEstimationTimeout. It defines the maximum duration that the
75
        // probing fee estimation is allowed to take.
76
        DefaultFeeEstimationTimeout = time.Minute
77

78
        // DefaultMissionControlNamespace is the name of the default mission
79
        // control name space. This is used as the sub-bucket key within the
80
        // top level DB bucket to store mission control results.
81
        DefaultMissionControlNamespace = "default"
82
)
83

84
var (
85
        // ErrInvalidMcHistory is returned if we get a negative mission control
86
        // history count.
87
        ErrInvalidMcHistory = errors.New("mission control history must be " +
88
                ">= 0")
89

90
        // ErrInvalidFailureInterval is returned if we get an invalid failure
91
        // interval.
92
        ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
93
)
94

95
// NodeResults contains previous results from a node to its peers.
96
type NodeResults map[route.Vertex]TimedPairResult
97

98
// mcConfig holds various config members that will be required by all
99
// MissionControl instances and will be the same regardless of namespace.
100
type mcConfig struct {
101
        // clock is a time source used by mission control.
102
        clock clock.Clock
103

104
        // selfNode is our pubkey.
105
        selfNode route.Vertex
106
}
107

108
// MissionControl contains state which summarizes the past attempts of HTLC
109
// routing by external callers when sending payments throughout the network. It
110
// acts as a shared memory during routing attempts with the goal to optimize the
111
// payment attempt success rate.
112
//
113
// Failed payment attempts are reported to mission control. These reports are
114
// used to track the time of the last node or channel level failure. The time
115
// since the last failure is used to estimate a success probability that is fed
116
// into the path finding process for subsequent payment attempts.
117
type MissionControl struct {
118
        cfg *mcConfig
119

120
        // state is the internal mission control state that is input for
121
        // probability estimation.
122
        state *missionControlState
123

124
        store *missionControlStore
125

126
        // estimator is the probability estimator that is used with the payment
127
        // results that mission control collects.
128
        estimator Estimator
129

130
        // onConfigUpdate is a function that is called whenever the
131
        // mission control state is updated.
132
        onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
133

134
        log btclog.Logger
135

136
        mu sync.Mutex
137
}
138

139
// MissionController manages MissionControl instances in various namespaces.
140
type MissionController struct {
141
        db           kvdb.Backend
142
        cfg          *mcConfig
143
        defaultMCCfg *MissionControlConfig
144

145
        mc map[string]*MissionControl
146
        mu sync.Mutex
147

148
        // TODO(roasbeef): further counters, if vertex continually unavailable,
149
        // add to another generation
150

151
        // TODO(roasbeef): also add favorable metrics for nodes
152
}
153

154
// GetNamespacedStore returns the MissionControl in the given namespace. If one
155
// does not yet exist, then it is initialised.
156
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
157
        error) {
37✔
158

37✔
159
        m.mu.Lock()
37✔
160
        defer m.mu.Unlock()
37✔
161

37✔
162
        if mc, ok := m.mc[ns]; ok {
73✔
163
                return mc, nil
36✔
164
        }
36✔
165

166
        return m.initMissionControl(ns)
1✔
167
}
168

169
// ListNamespaces returns a list of the namespaces that the MissionController
170
// is aware of.
171
func (m *MissionController) ListNamespaces() []string {
3✔
172
        m.mu.Lock()
3✔
173
        defer m.mu.Unlock()
3✔
174

3✔
175
        namespaces := make([]string, 0, len(m.mc))
3✔
176
        for ns := range m.mc {
8✔
177
                namespaces = append(namespaces, ns)
5✔
178
        }
5✔
179

180
        return namespaces
3✔
181
}
182

183
// MissionControlConfig defines parameters that control mission control
184
// behaviour.
185
type MissionControlConfig struct {
186
        // Estimator gives probability estimates for node pairs.
187
        Estimator Estimator
188

189
        // OnConfigUpdate is function that is called whenever the
190
        // mission control state is updated.
191
        OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
192

193
        // MaxMcHistory defines the maximum number of payment results that are
194
        // held on disk.
195
        MaxMcHistory int
196

197
        // McFlushInterval defines the ticker interval when we flush the
198
        // accumulated state to the DB.
199
        McFlushInterval time.Duration
200

201
        // MinFailureRelaxInterval is the minimum time that must have passed
202
        // since the previously recorded failure before the failure amount may
203
        // be raised.
204
        MinFailureRelaxInterval time.Duration
205
}
206

207
func (c *MissionControlConfig) validate() error {
34✔
208
        if c.MaxMcHistory < 0 {
34✔
209
                return ErrInvalidMcHistory
×
210
        }
×
211

212
        if c.MinFailureRelaxInterval < 0 {
34✔
213
                return ErrInvalidFailureInterval
×
214
        }
×
215

216
        return nil
34✔
217
}
218

219
// String returns a string representation of a mission control config.
220
func (c *MissionControlConfig) String() string {
3✔
221
        return fmt.Sprintf("maximum history: %v, minimum failure relax "+
3✔
222
                "interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
3✔
223
}
3✔
224

225
// TimedPairResult describes a timestamped pair result.
226
type TimedPairResult struct {
227
        // FailTime is the time of the last failure.
228
        FailTime time.Time
229

230
        // FailAmt is the amount of the last failure. This amount may be pushed
231
        // up if a later success is higher than the last failed amount.
232
        FailAmt lnwire.MilliSatoshi
233

234
        // SuccessTime is the time of the last success.
235
        SuccessTime time.Time
236

237
        // SuccessAmt is the highest amount that successfully forwarded. This
238
        // isn't necessarily the last success amount. The value of this field
239
        // may also be pushed down if a later failure is lower than the highest
240
        // success amount. Because of this, SuccessAmt may not match
241
        // SuccessTime.
242
        SuccessAmt lnwire.MilliSatoshi
243
}
244

245
// MissionControlSnapshot contains a snapshot of the current state of mission
246
// control.
247
type MissionControlSnapshot struct {
248
        // Pairs is a list of channels for which specific information is
249
        // logged.
250
        Pairs []MissionControlPairSnapshot
251
}
252

253
// MissionControlPairSnapshot contains a snapshot of the current node pair
254
// state in mission control.
255
type MissionControlPairSnapshot struct {
256
        // Pair is the node pair of which the state is described.
257
        Pair DirectedNodePair
258

259
        // TimedPairResult contains the data for this pair.
260
        TimedPairResult
261
}
262

263
// paymentResult is the information that becomes available when a payment
264
// attempt completes.
265
type paymentResult struct {
266
        id        uint64
267
        timeFwd   tlv.RecordT[tlv.TlvType0, uint64]
268
        timeReply tlv.RecordT[tlv.TlvType1, uint64]
269
        route     tlv.RecordT[tlv.TlvType2, mcRoute]
270

271
        // failure holds information related to the failure of a payment. The
272
        // presence of this record indicates a payment failure. The absence of
273
        // this record indicates a successful payment.
274
        failure tlv.OptionalRecordT[tlv.TlvType3, paymentFailure]
275
}
276

277
// newPaymentResult constructs a new paymentResult.
278
func newPaymentResult(id uint64, rt *mcRoute, timeFwd, timeReply time.Time,
279
        failure *paymentFailure) *paymentResult {
79✔
280

79✔
281
        result := &paymentResult{
79✔
282
                id: id,
79✔
283
                timeFwd: tlv.NewPrimitiveRecord[tlv.TlvType0](
79✔
284
                        uint64(timeFwd.UnixNano()),
79✔
285
                ),
79✔
286
                timeReply: tlv.NewPrimitiveRecord[tlv.TlvType1](
79✔
287
                        uint64(timeReply.UnixNano()),
79✔
288
                ),
79✔
289
                route: tlv.NewRecordT[tlv.TlvType2](*rt),
79✔
290
        }
79✔
291

79✔
292
        if failure != nil {
135✔
293
                result.failure = tlv.SomeRecordT(
56✔
294
                        tlv.NewRecordT[tlv.TlvType3](*failure),
56✔
295
                )
56✔
296
        }
56✔
297

298
        return result
79✔
299
}
300

301
// NewMissionController returns a new instance of MissionController.
302
func NewMissionController(db kvdb.Backend, self route.Vertex,
303
        cfg *MissionControlConfig) (*MissionController, error) {
34✔
304

34✔
305
        log.Debugf("Instantiating mission control with config: %v, %v", cfg,
34✔
306
                cfg.Estimator)
34✔
307

34✔
308
        if err := cfg.validate(); err != nil {
34✔
309
                return nil, err
×
310
        }
×
311

312
        mcCfg := &mcConfig{
34✔
313
                clock:    clock.NewDefaultClock(),
34✔
314
                selfNode: self,
34✔
315
        }
34✔
316

34✔
317
        mgr := &MissionController{
34✔
318
                db:           db,
34✔
319
                defaultMCCfg: cfg,
34✔
320
                cfg:          mcCfg,
34✔
321
                mc:           make(map[string]*MissionControl),
34✔
322
        }
34✔
323

34✔
324
        if err := mgr.loadMissionControls(); err != nil {
34✔
325
                return nil, err
×
326
        }
×
327

328
        for _, mc := range mgr.mc {
69✔
329
                if err := mc.init(); err != nil {
35✔
330
                        return nil, err
×
331
                }
×
332
        }
333

334
        return mgr, nil
34✔
335
}
336

337
// loadMissionControls initialises a MissionControl in the default namespace if
338
// one does not yet exist. It then initialises a MissionControl for all other
339
// namespaces found in the DB.
340
//
341
// NOTE: this should only be called once during MissionController construction.
342
func (m *MissionController) loadMissionControls() error {
34✔
343
        m.mu.Lock()
34✔
344
        defer m.mu.Unlock()
34✔
345

34✔
346
        // Always initialise the default namespace.
34✔
347
        _, err := m.initMissionControl(DefaultMissionControlNamespace)
34✔
348
        if err != nil {
34✔
349
                return err
×
350
        }
×
351

352
        namespaces := make(map[string]struct{})
34✔
353
        err = m.db.View(func(tx walletdb.ReadTx) error {
68✔
354
                mcStoreBkt := tx.ReadBucket(resultsKey)
34✔
355
                if mcStoreBkt == nil {
34✔
356
                        return fmt.Errorf("top level mission control bucket " +
×
357
                                "not found")
×
358
                }
×
359

360
                // Iterate through all the keys in the bucket and collect the
361
                // namespaces.
362
                return mcStoreBkt.ForEach(func(k, _ []byte) error {
69✔
363
                        // We've already initialised the default namespace so
35✔
364
                        // we can skip it.
35✔
365
                        if string(k) == DefaultMissionControlNamespace {
69✔
366
                                return nil
34✔
367
                        }
34✔
368

369
                        namespaces[string(k)] = struct{}{}
1✔
370

1✔
371
                        return nil
1✔
372
                })
373
        }, func() {})
34✔
374
        if err != nil {
34✔
375
                return err
×
376
        }
×
377

378
        // Now, iterate through all the namespaces and initialise them.
379
        for ns := range namespaces {
35✔
380
                _, err = m.initMissionControl(ns)
1✔
381
                if err != nil {
1✔
382
                        return err
×
383
                }
×
384
        }
385

386
        return nil
34✔
387
}
388

389
// initMissionControl creates a new MissionControl instance with the given
390
// namespace if one does not yet exist.
391
//
392
// NOTE: the MissionController's mutex must be held before calling this method.
393
func (m *MissionController) initMissionControl(namespace string) (
394
        *MissionControl, error) {
36✔
395

36✔
396
        // If a mission control with this namespace has already been initialised
36✔
397
        // then there is nothing left to do.
36✔
398
        if mc, ok := m.mc[namespace]; ok {
36✔
399
                return mc, nil
×
400
        }
×
401

402
        cfg := m.defaultMCCfg
36✔
403

36✔
404
        store, err := newMissionControlStore(
36✔
405
                newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
36✔
406
                cfg.McFlushInterval,
36✔
407
        )
36✔
408
        if err != nil {
36✔
409
                return nil, err
×
410
        }
×
411

412
        mc := &MissionControl{
36✔
413
                cfg: m.cfg,
36✔
414
                state: newMissionControlState(
36✔
415
                        cfg.MinFailureRelaxInterval,
36✔
416
                ),
36✔
417
                store:          store,
36✔
418
                estimator:      cfg.Estimator,
36✔
419
                log:            log.WithPrefix(fmt.Sprintf("[%s]:", namespace)),
36✔
420
                onConfigUpdate: cfg.OnConfigUpdate,
36✔
421
        }
36✔
422

36✔
423
        m.mc[namespace] = mc
36✔
424

36✔
425
        return mc, nil
36✔
426
}
427

428
// RunStoreTickers runs the mission controller store's tickers.
429
func (m *MissionController) RunStoreTickers() {
3✔
430
        m.mu.Lock()
3✔
431
        defer m.mu.Unlock()
3✔
432

3✔
433
        for _, mc := range m.mc {
6✔
434
                mc.store.run()
3✔
435
        }
3✔
436
}
437

438
// StopStoreTickers stops the mission control store's tickers.
439
func (m *MissionController) StopStoreTickers() {
3✔
440
        log.Debug("Stopping mission control store ticker")
3✔
441
        defer log.Debug("Mission control store ticker stopped")
3✔
442

3✔
443
        m.mu.Lock()
3✔
444
        defer m.mu.Unlock()
3✔
445

3✔
446
        for _, mc := range m.mc {
6✔
447
                mc.store.stop()
3✔
448
        }
3✔
449
}
450

451
// init initializes mission control with historical data.
452
func (m *MissionControl) init() error {
35✔
453
        m.log.Debugf("Mission control state reconstruction started")
35✔
454

35✔
455
        m.mu.Lock()
35✔
456
        defer m.mu.Unlock()
35✔
457

35✔
458
        start := time.Now()
35✔
459

35✔
460
        results, err := m.store.fetchAll()
35✔
461
        if err != nil {
35✔
462
                return err
×
463
        }
×
464

465
        for _, result := range results {
42✔
466
                _ = m.applyPaymentResult(result)
7✔
467
        }
7✔
468

469
        m.log.Debugf("Mission control state reconstruction finished: "+
35✔
470
                "n=%v, time=%v", len(results), time.Since(start))
35✔
471

35✔
472
        return nil
35✔
473
}
474

475
// GetConfig returns the config that mission control is currently configured
476
// with. All fields are copied by value, so we do not need to worry about
477
// mutation.
478
func (m *MissionControl) GetConfig() *MissionControlConfig {
3✔
479
        m.mu.Lock()
3✔
480
        defer m.mu.Unlock()
3✔
481

3✔
482
        return &MissionControlConfig{
3✔
483
                Estimator:               m.estimator,
3✔
484
                MaxMcHistory:            m.store.maxRecords,
3✔
485
                McFlushInterval:         m.store.flushInterval,
3✔
486
                MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
3✔
487
        }
3✔
488
}
3✔
489

490
// SetConfig validates the config provided and updates mission control's config
491
// if it is valid.
492
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
3✔
493
        if cfg == nil {
3✔
494
                return errors.New("nil mission control config")
×
495
        }
×
496

497
        if err := cfg.validate(); err != nil {
3✔
498
                return err
×
499
        }
×
500

501
        m.mu.Lock()
3✔
502
        defer m.mu.Unlock()
3✔
503

3✔
504
        m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
3✔
505
                cfg.Estimator)
3✔
506

3✔
507
        m.store.maxRecords = cfg.MaxMcHistory
3✔
508
        m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
3✔
509
        m.estimator = cfg.Estimator
3✔
510

3✔
511
        // Execute the callback function if it is set.
3✔
512
        m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
6✔
513
                f(cfg)
3✔
514
        })
3✔
515

516
        return nil
3✔
517
}
518

519
// ResetHistory resets the history of MissionControl returning it to a state as
520
// if no payment attempts have been made.
521
func (m *MissionControl) ResetHistory() error {
6✔
522
        m.mu.Lock()
6✔
523
        defer m.mu.Unlock()
6✔
524

6✔
525
        if err := m.store.clear(); err != nil {
6✔
526
                return err
×
527
        }
×
528

529
        m.state.resetHistory()
6✔
530

6✔
531
        m.log.Debugf("Mission control history cleared")
6✔
532

6✔
533
        return nil
6✔
534
}
535

536
// GetProbability is expected to return the success probability of a payment
537
// from fromNode along edge.
538
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
539
        amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
665✔
540

665✔
541
        m.mu.Lock()
665✔
542
        defer m.mu.Unlock()
665✔
543

665✔
544
        now := m.cfg.clock.Now()
665✔
545
        results, _ := m.state.getLastPairResult(fromNode)
665✔
546

665✔
547
        // Use a distinct probability estimation function for local channels.
665✔
548
        if fromNode == m.cfg.selfNode {
707✔
549
                return m.estimator.LocalPairProbability(now, results, toNode)
42✔
550
        }
42✔
551

552
        return m.estimator.PairProbability(
626✔
553
                now, results, toNode, amt, capacity,
626✔
554
        )
626✔
555
}
556

557
// GetHistorySnapshot takes a snapshot from the current mission control state
558
// and actual probability estimates.
559
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
1✔
560
        m.mu.Lock()
1✔
561
        defer m.mu.Unlock()
1✔
562

1✔
563
        m.log.Debugf("Requesting history snapshot from mission control")
1✔
564

1✔
565
        return m.state.getSnapshot()
1✔
566
}
1✔
567

568
// ImportHistory imports the set of mission control results provided to our
569
// in-memory state. These results are not persisted, so will not survive
570
// restarts.
571
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
572
        force bool) error {
3✔
573

3✔
574
        if history == nil {
3✔
575
                return errors.New("cannot import nil history")
×
576
        }
×
577

578
        m.mu.Lock()
3✔
579
        defer m.mu.Unlock()
3✔
580

3✔
581
        m.log.Infof("Importing history snapshot with %v pairs to mission "+
3✔
582
                "control", len(history.Pairs))
3✔
583

3✔
584
        imported := m.state.importSnapshot(history, force)
3✔
585

3✔
586
        m.log.Infof("Imported %v results to mission control", imported)
3✔
587

3✔
588
        return nil
3✔
589
}
590

591
// GetPairHistorySnapshot returns the stored history for a given node pair.
592
func (m *MissionControl) GetPairHistorySnapshot(
593
        fromNode, toNode route.Vertex) TimedPairResult {
3✔
594

3✔
595
        m.mu.Lock()
3✔
596
        defer m.mu.Unlock()
3✔
597

3✔
598
        results, ok := m.state.getLastPairResult(fromNode)
3✔
599
        if !ok {
6✔
600
                return TimedPairResult{}
3✔
601
        }
3✔
602

603
        result, ok := results[toNode]
3✔
604
        if !ok {
3✔
605
                return TimedPairResult{}
×
606
        }
×
607

608
        return result
3✔
609
}
610

611
// ReportPaymentFail reports a failed payment to mission control as input for
612
// future probability estimates. The failureSourceIdx argument indicates the
613
// failure source. If it is nil, the failure source is unknown. This function
614
// returns a reason if this failure is a final failure. In that case no further
615
// payment attempts need to be made.
616
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
617
        failureSourceIdx *int, failure lnwire.FailureMessage) (
618
        *channeldb.FailureReason, error) {
49✔
619

49✔
620
        timestamp := m.cfg.clock.Now()
49✔
621

49✔
622
        result := newPaymentResult(
49✔
623
                paymentID, extractMCRoute(rt), timestamp, timestamp,
49✔
624
                newPaymentFailure(failureSourceIdx, failure),
49✔
625
        )
49✔
626

49✔
627
        return m.processPaymentResult(result)
49✔
628
}
49✔
629

630
// ReportPaymentSuccess reports a successful payment to mission control as input
631
// for future probability estimates.
632
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
633
        rt *route.Route) error {
25✔
634

25✔
635
        timestamp := m.cfg.clock.Now()
25✔
636

25✔
637
        result := newPaymentResult(
25✔
638
                paymentID, extractMCRoute(rt), timestamp, timestamp, nil,
25✔
639
        )
25✔
640

25✔
641
        _, err := m.processPaymentResult(result)
25✔
642

25✔
643
        return err
25✔
644
}
25✔
645

646
// processPaymentResult stores a payment result in the mission control store and
647
// updates mission control's in-memory state.
648
func (m *MissionControl) processPaymentResult(result *paymentResult) (
649
        *channeldb.FailureReason, error) {
71✔
650

71✔
651
        // Store complete result in database.
71✔
652
        m.store.AddResult(result)
71✔
653

71✔
654
        m.mu.Lock()
71✔
655
        defer m.mu.Unlock()
71✔
656

71✔
657
        // Apply result to update mission control state.
71✔
658
        reason := m.applyPaymentResult(result)
71✔
659

71✔
660
        return reason, nil
71✔
661
}
71✔
662

663
// applyPaymentResult applies a payment result as input for future probability
664
// estimates. It returns a bool indicating whether this error is a final error
665
// and no further payment attempts need to be made.
666
func (m *MissionControl) applyPaymentResult(
667
        result *paymentResult) *channeldb.FailureReason {
75✔
668

75✔
669
        // Interpret result.
75✔
670
        i := interpretResult(&result.route.Val, result.failure.ValOpt())
75✔
671

75✔
672
        if i.policyFailure != nil {
87✔
673
                if m.state.requestSecondChance(
12✔
674
                        time.Unix(0, int64(result.timeReply.Val)),
12✔
675
                        i.policyFailure.From, i.policyFailure.To,
12✔
676
                ) {
21✔
677
                        return nil
9✔
678
                }
9✔
679
        }
680

681
        // If there is a node-level failure, record a failure for every tried
682
        // connection of that node. A node-level failure can be considered as a
683
        // failure that would have occurred with any of the node's channels.
684
        //
685
        // Ideally we'd also record the failure for the untried connections of
686
        // the node. Unfortunately this would require access to the graph and
687
        // adding this dependency and db calls does not outweigh the benefits.
688
        //
689
        // Untried connections will fall back to the node probability. After the
690
        // call to setAllPairResult below, the node probability will be equal to
691
        // the probability of the tried channels except that the a priori
692
        // probability is mixed in too. This effect is controlled by the
693
        // aprioriWeight parameter. If that parameter isn't set to an extreme
694
        // and there are a few known connections, there shouldn't be much of a
695
        // difference. The largest difference occurs when aprioriWeight is 1. In
696
        // that case, a node-level failure would not be applied to untried
697
        // channels.
698
        if i.nodeFailure != nil {
75✔
699
                m.log.Debugf("Reporting node failure to Mission Control: "+
6✔
700
                        "node=%v", *i.nodeFailure)
6✔
701

6✔
702
                m.state.setAllFail(
6✔
703
                        *i.nodeFailure,
6✔
704
                        time.Unix(0, int64(result.timeReply.Val)),
6✔
705
                )
6✔
706
        }
6✔
707

708
        for pair, pairResult := range i.pairResults {
218✔
709
                pairResult := pairResult
149✔
710

149✔
711
                if pairResult.success {
233✔
712
                        m.log.Debugf("Reporting pair success to Mission "+
84✔
713
                                "Control: pair=%v, amt=%v",
84✔
714
                                pair, pairResult.amt)
84✔
715
                } else {
152✔
716
                        m.log.Debugf("Reporting pair failure to Mission "+
68✔
717
                                "Control: pair=%v, amt=%v",
68✔
718
                                pair, pairResult.amt)
68✔
719
                }
68✔
720

721
                m.state.setLastPairResult(
149✔
722
                        pair.From, pair.To,
149✔
723
                        time.Unix(0, int64(result.timeReply.Val)), &pairResult,
149✔
724
                        false,
149✔
725
                )
149✔
726
        }
727

728
        return i.finalFailureReason
69✔
729
}
730

731
// namespacedDB is an implementation of the missionControlDB that gives a user
732
// of the interface access to a namespaced bucket within the top level mission
733
// control bucket.
734
type namespacedDB struct {
735
        topLevelBucketKey []byte
736
        namespace         []byte
737
        db                kvdb.Backend
738
}
739

740
// A compile-time check to ensure that namespacedDB implements missionControlDB.
741
var _ missionControlDB = (*namespacedDB)(nil)
742

743
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
744
// default namespace.
745
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
4✔
746
        return newNamespacedDB(db, DefaultMissionControlNamespace)
4✔
747
}
4✔
748

749
// newNamespacedDB creates a new instance of missionControlDB where the DB will
750
// have access to a namespaced bucket within the top level mission control
751
// bucket.
752
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
40✔
753
        return &namespacedDB{
40✔
754
                db:                db,
40✔
755
                namespace:         []byte(namespace),
40✔
756
                topLevelBucketKey: resultsKey,
40✔
757
        }
40✔
758
}
40✔
759

760
// update can be used to perform reads and writes on the given bucket.
761
//
762
// NOTE: this is part of the missionControlDB interface.
763
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
764
        reset func()) error {
49✔
765

49✔
766
        return n.db.Update(func(tx kvdb.RwTx) error {
98✔
767
                mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
49✔
768
                if err != nil {
49✔
769
                        return fmt.Errorf("cannot create top level mission "+
×
770
                                "control bucket: %w", err)
×
771
                }
×
772

773
                namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
49✔
774
                        n.namespace,
49✔
775
                )
49✔
776
                if err != nil {
49✔
777
                        return fmt.Errorf("cannot create namespaced bucket "+
×
778
                                "(%s) in mission control store: %w",
×
779
                                n.namespace, err)
×
780
                }
×
781

782
                return f(namespacedBkt)
49✔
783
        }, reset)
784
}
785

786
// view can be used to perform reads on the given bucket.
787
//
788
// NOTE: this is part of the missionControlDB interface.
789
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
790
        reset func()) error {
50✔
791

50✔
792
        return n.db.View(func(tx kvdb.RTx) error {
100✔
793
                mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
50✔
794
                if mcStoreBkt == nil {
50✔
795
                        return fmt.Errorf("top level mission control bucket " +
×
796
                                "not found")
×
797
                }
×
798

799
                namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
50✔
800
                if namespacedBkt == nil {
50✔
801
                        return fmt.Errorf("namespaced bucket (%s) not found "+
×
802
                                "in mission control store", n.namespace)
×
803
                }
×
804

805
                return f(namespacedBkt)
50✔
806
        }, reset)
807
}
808

809
// purge will delete all the contents in the namespace.
810
//
811
// NOTE: this is part of the missionControlDB interface.
812
func (n *namespacedDB) purge() error {
6✔
813
        return n.db.Update(func(tx kvdb.RwTx) error {
12✔
814
                mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
6✔
815
                if mcStoreBkt == nil {
6✔
816
                        return nil
×
817
                }
×
818

819
                err := mcStoreBkt.DeleteNestedBucket(n.namespace)
6✔
820
                if err != nil {
6✔
821
                        return err
×
822
                }
×
823

824
                _, err = mcStoreBkt.CreateBucket(n.namespace)
6✔
825

6✔
826
                return err
6✔
827
        }, func() {})
6✔
828
}
829

830
// paymentFailure represents the presence of a payment failure. It may or may
831
// not include additional information about said failure.
832
type paymentFailure struct {
833
        info tlv.OptionalRecordT[tlv.TlvType0, paymentFailureInfo]
834
}
835

836
// newPaymentFailure constructs a new paymentFailure struct. If the source
837
// index is nil, then an empty paymentFailure is returned. This represents a
838
// failure with unknown details. Otherwise, the index and failure message are
839
// used to populate the info field of the paymentFailure.
840
func newPaymentFailure(sourceIdx *int,
841
        failureMsg lnwire.FailureMessage) *paymentFailure {
80✔
842

80✔
843
        if sourceIdx == nil {
81✔
844
                return &paymentFailure{}
1✔
845
        }
1✔
846

847
        info := paymentFailureInfo{
79✔
848
                sourceIdx: tlv.NewPrimitiveRecord[tlv.TlvType0](
79✔
849
                        uint8(*sourceIdx),
79✔
850
                ),
79✔
851
                msg: tlv.NewRecordT[tlv.TlvType1](failureMessage{failureMsg}),
79✔
852
        }
79✔
853

79✔
854
        return &paymentFailure{
79✔
855
                info: tlv.SomeRecordT(tlv.NewRecordT[tlv.TlvType0](info)),
79✔
856
        }
79✔
857
}
858

859
// Record returns a TLV record that can be used to encode/decode a
860
// paymentFailure to/from a TLV stream.
861
func (r *paymentFailure) Record() tlv.Record {
37✔
862
        recordSize := func() uint64 {
51✔
863
                var (
14✔
864
                        b   bytes.Buffer
14✔
865
                        buf [8]byte
14✔
866
                )
14✔
867
                if err := encodePaymentFailure(&b, r, &buf); err != nil {
14✔
868
                        panic(err)
×
869
                }
870

871
                return uint64(len(b.Bytes()))
14✔
872
        }
873

874
        return tlv.MakeDynamicRecord(
37✔
875
                0, r, recordSize, encodePaymentFailure, decodePaymentFailure,
37✔
876
        )
37✔
877
}
878

879
func encodePaymentFailure(w io.Writer, val interface{}, _ *[8]byte) error {
25✔
880
        if v, ok := val.(*paymentFailure); ok {
50✔
881
                var recordProducers []tlv.RecordProducer
25✔
882
                v.info.WhenSome(
25✔
883
                        func(r tlv.RecordT[tlv.TlvType0, paymentFailureInfo]) {
50✔
884
                                recordProducers = append(recordProducers, &r)
25✔
885
                        },
25✔
886
                )
887

888
                return lnwire.EncodeRecordsTo(
25✔
889
                        w, lnwire.ProduceRecordsSorted(recordProducers...),
25✔
890
                )
25✔
891
        }
892

893
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailure")
×
894
}
895

896
func decodePaymentFailure(r io.Reader, val interface{}, _ *[8]byte,
897
        l uint64) error {
24✔
898

24✔
899
        if v, ok := val.(*paymentFailure); ok {
48✔
900
                var h paymentFailure
24✔
901

24✔
902
                info := tlv.ZeroRecordT[tlv.TlvType0, paymentFailureInfo]()
24✔
903
                typeMap, err := lnwire.DecodeRecords(
24✔
904
                        r, lnwire.ProduceRecordsSorted(&info)...,
24✔
905
                )
24✔
906
                if err != nil {
24✔
907
                        return err
×
908
                }
×
909

910
                if _, ok := typeMap[h.info.TlvType()]; ok {
48✔
911
                        h.info = tlv.SomeRecordT(info)
24✔
912
                }
24✔
913

914
                *v = h
24✔
915

24✔
916
                return nil
24✔
917
        }
918

919
        return tlv.NewTypeForDecodingErr(val, "routing.paymentFailure", l, l)
×
920
}
921

922
// paymentFailureInfo holds additional information about a payment failure.
923
type paymentFailureInfo struct {
924
        sourceIdx tlv.RecordT[tlv.TlvType0, uint8]
925
        msg       tlv.RecordT[tlv.TlvType1, failureMessage]
926
}
927

928
// Record returns a TLV record that can be used to encode/decode a
929
// paymentFailureInfo to/from a TLV stream.
930
func (r *paymentFailureInfo) Record() tlv.Record {
46✔
931
        recordSize := func() uint64 {
71✔
932
                var (
25✔
933
                        b   bytes.Buffer
25✔
934
                        buf [8]byte
25✔
935
                )
25✔
936
                if err := encodePaymentFailureInfo(&b, r, &buf); err != nil {
25✔
937
                        panic(err)
×
938
                }
939

940
                return uint64(len(b.Bytes()))
25✔
941
        }
942

943
        return tlv.MakeDynamicRecord(
46✔
944
                0, r, recordSize, encodePaymentFailureInfo,
46✔
945
                decodePaymentFailureInfo,
46✔
946
        )
46✔
947
}
948

949
func encodePaymentFailureInfo(w io.Writer, val interface{}, _ *[8]byte) error {
47✔
950
        if v, ok := val.(*paymentFailureInfo); ok {
94✔
951
                return lnwire.EncodeRecordsTo(
47✔
952
                        w, lnwire.ProduceRecordsSorted(
47✔
953
                                &v.sourceIdx, &v.msg,
47✔
954
                        ),
47✔
955
                )
47✔
956
        }
47✔
957

958
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailureInfo")
×
959
}
960

961
func decodePaymentFailureInfo(r io.Reader, val interface{}, _ *[8]byte,
962
        l uint64) error {
24✔
963

24✔
964
        if v, ok := val.(*paymentFailureInfo); ok {
48✔
965
                var h paymentFailureInfo
24✔
966

24✔
967
                _, err := lnwire.DecodeRecords(
24✔
968
                        r,
24✔
969
                        lnwire.ProduceRecordsSorted(&h.sourceIdx, &h.msg)...,
24✔
970
                )
24✔
971
                if err != nil {
24✔
972
                        return err
×
973
                }
×
974

975
                *v = h
24✔
976

24✔
977
                return nil
24✔
978
        }
979

980
        return tlv.NewTypeForDecodingErr(
×
981
                val, "routing.paymentFailureInfo", l, l,
×
982
        )
×
983
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc