• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13536249039

26 Feb 2025 03:42AM UTC coverage: 57.462% (-1.4%) from 58.835%
13536249039

Pull #8453

github

Roasbeef
peer: update chooseDeliveryScript to gen script if needed

In this commit, we update `chooseDeliveryScript` to generate a new
script if needed. This allows us to fold in a few other lines that
always followed this function into this expanded function.

The tests have been updated accordingly.
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

275 of 1318 new or added lines in 22 files covered. (20.86%)

19521 existing lines in 257 files now uncovered.

103858 of 180741 relevant lines covered (57.46%)

24750.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.17
/routing/missioncontrol.go
1
package routing
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/channeldb"
15
        "github.com/lightningnetwork/lnd/clock"
16
        "github.com/lightningnetwork/lnd/fn/v2"
17
        "github.com/lightningnetwork/lnd/kvdb"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/routing/route"
20
        "github.com/lightningnetwork/lnd/tlv"
21
)
22

23
const (
24
        // DefaultPenaltyHalfLife is the default half-life duration. The
25
        // half-life duration defines after how much time a penalized node or
26
        // channel is back at 50% probability.
27
        DefaultPenaltyHalfLife = time.Hour
28

29
        // minSecondChanceInterval is the minimum time required between
30
        // second-chance failures.
31
        //
32
        // If nodes return a channel policy related failure, they may get a
33
        // second chance to forward the payment. It could be that the channel
34
        // policy that we are aware of is not up to date. This is especially
35
        // important in case of mobile apps that are mostly offline.
36
        //
37
        // However, we don't want to give nodes the option to endlessly return
38
        // new channel updates so that we are kept busy trying to route through
39
        // that node until the payment loop times out.
40
        //
41
        // Therefore we only grant a second chance to a node if the previous
42
        // second chance is sufficiently long ago. This is what
43
        // minSecondChanceInterval defines. If a second policy failure comes in
44
        // within that interval, we will apply a penalty.
45
        //
46
        // Second chances granted are tracked on the level of node pairs. This
47
        // means that if a node has multiple channels to the same peer, they
48
        // will only get a single second chance to route to that peer again.
49
        // Nodes forward non-strict, so it isn't necessary to apply a less
50
        // restrictive channel level tracking scheme here.
51
        minSecondChanceInterval = time.Minute
52

53
        // DefaultMaxMcHistory is the default maximum history size.
54
        DefaultMaxMcHistory = 1000
55

56
        // DefaultMcFlushInterval is the default interval we use to flush MC state
57
        // to the database.
58
        DefaultMcFlushInterval = time.Second
59

60
        // prevSuccessProbability is the assumed probability for node pairs that
61
        // successfully relayed the previous attempt.
62
        prevSuccessProbability = 0.95
63

64
        // DefaultAprioriWeight is the default a priori weight. See
65
        // MissionControlConfig for further explanation.
66
        DefaultAprioriWeight = 0.5
67

68
        // DefaultMinFailureRelaxInterval is the default minimum time that must
69
        // have passed since the previously recorded failure before the failure
70
        // amount may be raised.
71
        DefaultMinFailureRelaxInterval = time.Minute
72

73
        // DefaultFeeEstimationTimeout is the default value for
74
        // FeeEstimationTimeout. It defines the maximum duration that the
75
        // probing fee estimation is allowed to take.
76
        DefaultFeeEstimationTimeout = time.Minute
77

78
        // DefaultMissionControlNamespace is the name of the default mission
79
        // control name space. This is used as the sub-bucket key within the
80
        // top level DB bucket to store mission control results.
81
        DefaultMissionControlNamespace = "default"
82
)
83

84
var (
85
        // ErrInvalidMcHistory is returned if we get a negative mission control
86
        // history count.
87
        ErrInvalidMcHistory = errors.New("mission control history must be " +
88
                ">= 0")
89

90
        // ErrInvalidFailureInterval is returned if we get an invalid failure
91
        // interval.
92
        ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
93
)
94

95
// NodeResults contains previous results from a node to its peers.
96
type NodeResults map[route.Vertex]TimedPairResult
97

98
// mcConfig holds various config members that will be required by all
99
// MissionControl instances and will be the same regardless of namespace.
100
type mcConfig struct {
101
        // clock is a time source used by mission control.
102
        clock clock.Clock
103

104
        // selfNode is our pubkey.
105
        selfNode route.Vertex
106
}
107

108
// MissionControl contains state which summarizes the past attempts of HTLC
109
// routing by external callers when sending payments throughout the network. It
110
// acts as a shared memory during routing attempts with the goal to optimize the
111
// payment attempt success rate.
112
//
113
// Failed payment attempts are reported to mission control. These reports are
114
// used to track the time of the last node or channel level failure. The time
115
// since the last failure is used to estimate a success probability that is fed
116
// into the path finding process for subsequent payment attempts.
117
type MissionControl struct {
118
        cfg *mcConfig
119

120
        // state is the internal mission control state that is input for
121
        // probability estimation.
122
        state *missionControlState
123

124
        store *missionControlStore
125

126
        // estimator is the probability estimator that is used with the payment
127
        // results that mission control collects.
128
        estimator Estimator
129

130
        // onConfigUpdate is a function that is called whenever the
131
        // mission control state is updated.
132
        onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
133

134
        log btclog.Logger
135

136
        mu sync.Mutex
137
}
138

139
// MissionController manages MissionControl instances in various namespaces.
140
type MissionController struct {
141
        db           kvdb.Backend
142
        cfg          *mcConfig
143
        defaultMCCfg *MissionControlConfig
144

145
        mc map[string]*MissionControl
146
        mu sync.Mutex
147

148
        // TODO(roasbeef): further counters, if vertex continually unavailable,
149
        // add to another generation
150

151
        // TODO(roasbeef): also add favorable metrics for nodes
152
}
153

154
// GetNamespacedStore returns the MissionControl in the given namespace. If one
155
// does not yet exist, then it is initialised.
156
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
157
        error) {
34✔
158

34✔
159
        m.mu.Lock()
34✔
160
        defer m.mu.Unlock()
34✔
161

34✔
162
        if mc, ok := m.mc[ns]; ok {
67✔
163
                return mc, nil
33✔
164
        }
33✔
165

166
        return m.initMissionControl(ns)
1✔
167
}
168

169
// ListNamespaces returns a list of the namespaces that the MissionController
170
// is aware of.
171
func (m *MissionController) ListNamespaces() []string {
3✔
172
        m.mu.Lock()
3✔
173
        defer m.mu.Unlock()
3✔
174

3✔
175
        namespaces := make([]string, 0, len(m.mc))
3✔
176
        for ns := range m.mc {
8✔
177
                namespaces = append(namespaces, ns)
5✔
178
        }
5✔
179

180
        return namespaces
3✔
181
}
182

183
// MissionControlConfig defines parameters that control mission control
184
// behaviour.
185
type MissionControlConfig struct {
186
        // Estimator gives probability estimates for node pairs.
187
        Estimator Estimator
188

189
        // OnConfigUpdate is function that is called whenever the
190
        // mission control state is updated.
191
        OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
192

193
        // MaxMcHistory defines the maximum number of payment results that are
194
        // held on disk.
195
        MaxMcHistory int
196

197
        // McFlushInterval defines the ticker interval when we flush the
198
        // accumulated state to the DB.
199
        McFlushInterval time.Duration
200

201
        // MinFailureRelaxInterval is the minimum time that must have passed
202
        // since the previously recorded failure before the failure amount may
203
        // be raised.
204
        MinFailureRelaxInterval time.Duration
205
}
206

207
func (c *MissionControlConfig) validate() error {
31✔
208
        if c.MaxMcHistory < 0 {
31✔
209
                return ErrInvalidMcHistory
×
210
        }
×
211

212
        if c.MinFailureRelaxInterval < 0 {
31✔
213
                return ErrInvalidFailureInterval
×
214
        }
×
215

216
        return nil
31✔
217
}
218

219
// String returns a string representation of a mission control config.
UNCOV
220
func (c *MissionControlConfig) String() string {
×
UNCOV
221
        return fmt.Sprintf("maximum history: %v, minimum failure relax "+
×
UNCOV
222
                "interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
×
UNCOV
223
}
×
224

225
// TimedPairResult describes a timestamped pair result.
226
type TimedPairResult struct {
227
        // FailTime is the time of the last failure.
228
        FailTime time.Time
229

230
        // FailAmt is the amount of the last failure. This amount may be pushed
231
        // up if a later success is higher than the last failed amount.
232
        FailAmt lnwire.MilliSatoshi
233

234
        // SuccessTime is the time of the last success.
235
        SuccessTime time.Time
236

237
        // SuccessAmt is the highest amount that successfully forwarded. This
238
        // isn't necessarily the last success amount. The value of this field
239
        // may also be pushed down if a later failure is lower than the highest
240
        // success amount. Because of this, SuccessAmt may not match
241
        // SuccessTime.
242
        SuccessAmt lnwire.MilliSatoshi
243
}
244

245
// MissionControlSnapshot contains a snapshot of the current state of mission
246
// control.
247
type MissionControlSnapshot struct {
248
        // Pairs is a list of channels for which specific information is
249
        // logged.
250
        Pairs []MissionControlPairSnapshot
251
}
252

253
// MissionControlPairSnapshot contains a snapshot of the current node pair
254
// state in mission control.
255
type MissionControlPairSnapshot struct {
256
        // Pair is the node pair of which the state is described.
257
        Pair DirectedNodePair
258

259
        // TimedPairResult contains the data for this pair.
260
        TimedPairResult
261
}
262

263
// paymentResult is the information that becomes available when a payment
264
// attempt completes.
265
type paymentResult struct {
266
        id        uint64
267
        timeFwd   tlv.RecordT[tlv.TlvType0, uint64]
268
        timeReply tlv.RecordT[tlv.TlvType1, uint64]
269
        route     tlv.RecordT[tlv.TlvType2, mcRoute]
270

271
        // failure holds information related to the failure of a payment. The
272
        // presence of this record indicates a payment failure. The absence of
273
        // this record indicates a successful payment.
274
        failure tlv.OptionalRecordT[tlv.TlvType3, paymentFailure]
275
}
276

277
// newPaymentResult constructs a new paymentResult.
278
func newPaymentResult(id uint64, rt *mcRoute, timeFwd, timeReply time.Time,
279
        failure *paymentFailure) *paymentResult {
76✔
280

76✔
281
        result := &paymentResult{
76✔
282
                id: id,
76✔
283
                timeFwd: tlv.NewPrimitiveRecord[tlv.TlvType0](
76✔
284
                        uint64(timeFwd.UnixNano()),
76✔
285
                ),
76✔
286
                timeReply: tlv.NewPrimitiveRecord[tlv.TlvType1](
76✔
287
                        uint64(timeReply.UnixNano()),
76✔
288
                ),
76✔
289
                route: tlv.NewRecordT[tlv.TlvType2](*rt),
76✔
290
        }
76✔
291

76✔
292
        if failure != nil {
129✔
293
                result.failure = tlv.SomeRecordT(
53✔
294
                        tlv.NewRecordT[tlv.TlvType3](*failure),
53✔
295
                )
53✔
296
        }
53✔
297

298
        return result
76✔
299
}
300

301
// NewMissionController returns a new instance of MissionController.
302
func NewMissionController(db kvdb.Backend, self route.Vertex,
303
        cfg *MissionControlConfig) (*MissionController, error) {
31✔
304

31✔
305
        log.Debugf("Instantiating mission control with config: %v, %v", cfg,
31✔
306
                cfg.Estimator)
31✔
307

31✔
308
        if err := cfg.validate(); err != nil {
31✔
309
                return nil, err
×
310
        }
×
311

312
        mcCfg := &mcConfig{
31✔
313
                clock:    clock.NewDefaultClock(),
31✔
314
                selfNode: self,
31✔
315
        }
31✔
316

31✔
317
        mgr := &MissionController{
31✔
318
                db:           db,
31✔
319
                defaultMCCfg: cfg,
31✔
320
                cfg:          mcCfg,
31✔
321
                mc:           make(map[string]*MissionControl),
31✔
322
        }
31✔
323

31✔
324
        if err := mgr.loadMissionControls(); err != nil {
31✔
325
                return nil, err
×
326
        }
×
327

328
        for _, mc := range mgr.mc {
63✔
329
                if err := mc.init(); err != nil {
32✔
330
                        return nil, err
×
331
                }
×
332
        }
333

334
        return mgr, nil
31✔
335
}
336

337
// loadMissionControls initialises a MissionControl in the default namespace if
338
// one does not yet exist. It then initialises a MissionControl for all other
339
// namespaces found in the DB.
340
//
341
// NOTE: this should only be called once during MissionController construction.
342
func (m *MissionController) loadMissionControls() error {
31✔
343
        m.mu.Lock()
31✔
344
        defer m.mu.Unlock()
31✔
345

31✔
346
        // Always initialise the default namespace.
31✔
347
        _, err := m.initMissionControl(DefaultMissionControlNamespace)
31✔
348
        if err != nil {
31✔
349
                return err
×
350
        }
×
351

352
        namespaces := make(map[string]struct{})
31✔
353
        err = m.db.View(func(tx walletdb.ReadTx) error {
62✔
354
                mcStoreBkt := tx.ReadBucket(resultsKey)
31✔
355
                if mcStoreBkt == nil {
31✔
356
                        return fmt.Errorf("top level mission control bucket " +
×
357
                                "not found")
×
358
                }
×
359

360
                // Iterate through all the keys in the bucket and collect the
361
                // namespaces.
362
                return mcStoreBkt.ForEach(func(k, _ []byte) error {
63✔
363
                        // We've already initialised the default namespace so
32✔
364
                        // we can skip it.
32✔
365
                        if string(k) == DefaultMissionControlNamespace {
63✔
366
                                return nil
31✔
367
                        }
31✔
368

369
                        namespaces[string(k)] = struct{}{}
1✔
370

1✔
371
                        return nil
1✔
372
                })
373
        }, func() {})
31✔
374
        if err != nil {
31✔
375
                return err
×
376
        }
×
377

378
        // Now, iterate through all the namespaces and initialise them.
379
        for ns := range namespaces {
32✔
380
                _, err = m.initMissionControl(ns)
1✔
381
                if err != nil {
1✔
382
                        return err
×
383
                }
×
384
        }
385

386
        return nil
31✔
387
}
388

389
// initMissionControl creates a new MissionControl instance with the given
390
// namespace if one does not yet exist.
391
//
392
// NOTE: the MissionController's mutex must be held before calling this method.
393
func (m *MissionController) initMissionControl(namespace string) (
394
        *MissionControl, error) {
33✔
395

33✔
396
        // If a mission control with this namespace has already been initialised
33✔
397
        // then there is nothing left to do.
33✔
398
        if mc, ok := m.mc[namespace]; ok {
33✔
399
                return mc, nil
×
400
        }
×
401

402
        cfg := m.defaultMCCfg
33✔
403

33✔
404
        store, err := newMissionControlStore(
33✔
405
                newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
33✔
406
                cfg.McFlushInterval,
33✔
407
        )
33✔
408
        if err != nil {
33✔
409
                return nil, err
×
410
        }
×
411

412
        mc := &MissionControl{
33✔
413
                cfg: m.cfg,
33✔
414
                state: newMissionControlState(
33✔
415
                        cfg.MinFailureRelaxInterval,
33✔
416
                ),
33✔
417
                store:          store,
33✔
418
                estimator:      cfg.Estimator,
33✔
419
                log:            log.WithPrefix(fmt.Sprintf("[%s]:", namespace)),
33✔
420
                onConfigUpdate: cfg.OnConfigUpdate,
33✔
421
        }
33✔
422

33✔
423
        m.mc[namespace] = mc
33✔
424

33✔
425
        return mc, nil
33✔
426
}
427

428
// RunStoreTickers runs the mission controller store's tickers.
UNCOV
429
func (m *MissionController) RunStoreTickers() {
×
UNCOV
430
        m.mu.Lock()
×
UNCOV
431
        defer m.mu.Unlock()
×
UNCOV
432

×
UNCOV
433
        for _, mc := range m.mc {
×
UNCOV
434
                mc.store.run()
×
UNCOV
435
        }
×
436
}
437

438
// StopStoreTickers stops the mission control store's tickers.
UNCOV
439
func (m *MissionController) StopStoreTickers() {
×
UNCOV
440
        log.Debug("Stopping mission control store ticker")
×
UNCOV
441
        defer log.Debug("Mission control store ticker stopped")
×
UNCOV
442

×
UNCOV
443
        m.mu.Lock()
×
UNCOV
444
        defer m.mu.Unlock()
×
UNCOV
445

×
UNCOV
446
        for _, mc := range m.mc {
×
UNCOV
447
                mc.store.stop()
×
UNCOV
448
        }
×
449
}
450

451
// init initializes mission control with historical data.
452
func (m *MissionControl) init() error {
32✔
453
        m.log.Debugf("Mission control state reconstruction started")
32✔
454

32✔
455
        m.mu.Lock()
32✔
456
        defer m.mu.Unlock()
32✔
457

32✔
458
        start := time.Now()
32✔
459

32✔
460
        results, err := m.store.fetchAll()
32✔
461
        if err != nil {
32✔
462
                return err
×
463
        }
×
464

465
        for _, result := range results {
36✔
466
                _ = m.applyPaymentResult(result)
4✔
467
        }
4✔
468

469
        m.log.Debugf("Mission control state reconstruction finished: "+
32✔
470
                "n=%v, time=%v", len(results), time.Since(start))
32✔
471

32✔
472
        return nil
32✔
473
}
474

475
// GetConfig returns the config that mission control is currently configured
476
// with. All fields are copied by value, so we do not need to worry about
477
// mutation.
UNCOV
478
func (m *MissionControl) GetConfig() *MissionControlConfig {
×
UNCOV
479
        m.mu.Lock()
×
UNCOV
480
        defer m.mu.Unlock()
×
UNCOV
481

×
UNCOV
482
        return &MissionControlConfig{
×
UNCOV
483
                Estimator:               m.estimator,
×
UNCOV
484
                MaxMcHistory:            m.store.maxRecords,
×
UNCOV
485
                McFlushInterval:         m.store.flushInterval,
×
UNCOV
486
                MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
×
UNCOV
487
        }
×
UNCOV
488
}
×
489

490
// SetConfig validates the config provided and updates mission control's config
491
// if it is valid.
UNCOV
492
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
×
UNCOV
493
        if cfg == nil {
×
494
                return errors.New("nil mission control config")
×
495
        }
×
496

UNCOV
497
        if err := cfg.validate(); err != nil {
×
498
                return err
×
499
        }
×
500

UNCOV
501
        m.mu.Lock()
×
UNCOV
502
        defer m.mu.Unlock()
×
UNCOV
503

×
UNCOV
504
        m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
×
UNCOV
505
                cfg.Estimator)
×
UNCOV
506

×
UNCOV
507
        m.store.maxRecords = cfg.MaxMcHistory
×
UNCOV
508
        m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
×
UNCOV
509
        m.estimator = cfg.Estimator
×
UNCOV
510

×
UNCOV
511
        // Execute the callback function if it is set.
×
UNCOV
512
        m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
×
UNCOV
513
                f(cfg)
×
UNCOV
514
        })
×
515

UNCOV
516
        return nil
×
517
}
518

519
// ResetHistory resets the history of MissionControl returning it to a state as
520
// if no payment attempts have been made.
521
func (m *MissionControl) ResetHistory() error {
3✔
522
        m.mu.Lock()
3✔
523
        defer m.mu.Unlock()
3✔
524

3✔
525
        if err := m.store.clear(); err != nil {
3✔
526
                return err
×
527
        }
×
528

529
        m.state.resetHistory()
3✔
530

3✔
531
        m.log.Debugf("Mission control history cleared")
3✔
532

3✔
533
        return nil
3✔
534
}
535

536
// GetProbability is expected to return the success probability of a payment
537
// from fromNode along edge.
538
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
539
        amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
662✔
540

662✔
541
        m.mu.Lock()
662✔
542
        defer m.mu.Unlock()
662✔
543

662✔
544
        now := m.cfg.clock.Now()
662✔
545
        results, _ := m.state.getLastPairResult(fromNode)
662✔
546

662✔
547
        // Use a distinct probability estimation function for local channels.
662✔
548
        if fromNode == m.cfg.selfNode {
701✔
549
                return m.estimator.LocalPairProbability(now, results, toNode)
39✔
550
        }
39✔
551

552
        return m.estimator.PairProbability(
623✔
553
                now, results, toNode, amt, capacity,
623✔
554
        )
623✔
555
}
556

557
// GetHistorySnapshot takes a snapshot from the current mission control state
558
// and actual probability estimates.
559
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
1✔
560
        m.mu.Lock()
1✔
561
        defer m.mu.Unlock()
1✔
562

1✔
563
        m.log.Debugf("Requesting history snapshot from mission control")
1✔
564

1✔
565
        return m.state.getSnapshot()
1✔
566
}
1✔
567

568
// ImportHistory imports the set of mission control results provided to our
569
// in-memory state. These results are not persisted, so will not survive
570
// restarts.
571
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
UNCOV
572
        force bool) error {
×
UNCOV
573

×
UNCOV
574
        if history == nil {
×
575
                return errors.New("cannot import nil history")
×
576
        }
×
577

UNCOV
578
        m.mu.Lock()
×
UNCOV
579
        defer m.mu.Unlock()
×
UNCOV
580

×
UNCOV
581
        m.log.Infof("Importing history snapshot with %v pairs to mission "+
×
UNCOV
582
                "control", len(history.Pairs))
×
UNCOV
583

×
UNCOV
584
        imported := m.state.importSnapshot(history, force)
×
UNCOV
585

×
UNCOV
586
        m.log.Infof("Imported %v results to mission control", imported)
×
UNCOV
587

×
UNCOV
588
        return nil
×
589
}
590

591
// GetPairHistorySnapshot returns the stored history for a given node pair.
592
func (m *MissionControl) GetPairHistorySnapshot(
UNCOV
593
        fromNode, toNode route.Vertex) TimedPairResult {
×
UNCOV
594

×
UNCOV
595
        m.mu.Lock()
×
UNCOV
596
        defer m.mu.Unlock()
×
UNCOV
597

×
UNCOV
598
        results, ok := m.state.getLastPairResult(fromNode)
×
UNCOV
599
        if !ok {
×
UNCOV
600
                return TimedPairResult{}
×
UNCOV
601
        }
×
602

UNCOV
603
        result, ok := results[toNode]
×
UNCOV
604
        if !ok {
×
605
                return TimedPairResult{}
×
606
        }
×
607

UNCOV
608
        return result
×
609
}
610

611
// ReportPaymentFail reports a failed payment to mission control as input for
612
// future probability estimates. The failureSourceIdx argument indicates the
613
// failure source. If it is nil, the failure source is unknown. This function
614
// returns a reason if this failure is a final failure. In that case no further
615
// payment attempts need to be made.
616
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
617
        failureSourceIdx *int, failure lnwire.FailureMessage) (
618
        *channeldb.FailureReason, error) {
46✔
619

46✔
620
        timestamp := m.cfg.clock.Now()
46✔
621

46✔
622
        result := newPaymentResult(
46✔
623
                paymentID, extractMCRoute(rt), timestamp, timestamp,
46✔
624
                newPaymentFailure(failureSourceIdx, failure),
46✔
625
        )
46✔
626

46✔
627
        return m.processPaymentResult(result)
46✔
628
}
46✔
629

630
// ReportPaymentSuccess reports a successful payment to mission control as input
631
// for future probability estimates.
632
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
633
        rt *route.Route) error {
22✔
634

22✔
635
        timestamp := m.cfg.clock.Now()
22✔
636

22✔
637
        result := newPaymentResult(
22✔
638
                paymentID, extractMCRoute(rt), timestamp, timestamp, nil,
22✔
639
        )
22✔
640

22✔
641
        _, err := m.processPaymentResult(result)
22✔
642

22✔
643
        return err
22✔
644
}
22✔
645

646
// processPaymentResult stores a payment result in the mission control store and
647
// updates mission control's in-memory state.
648
func (m *MissionControl) processPaymentResult(result *paymentResult) (
649
        *channeldb.FailureReason, error) {
68✔
650

68✔
651
        // Store complete result in database.
68✔
652
        m.store.AddResult(result)
68✔
653

68✔
654
        m.mu.Lock()
68✔
655
        defer m.mu.Unlock()
68✔
656

68✔
657
        // Apply result to update mission control state.
68✔
658
        reason := m.applyPaymentResult(result)
68✔
659

68✔
660
        return reason, nil
68✔
661
}
68✔
662

663
// applyPaymentResult applies a payment result as input for future probability
664
// estimates. It returns a bool indicating whether this error is a final error
665
// and no further payment attempts need to be made.
666
func (m *MissionControl) applyPaymentResult(
667
        result *paymentResult) *channeldb.FailureReason {
72✔
668

72✔
669
        // Interpret result.
72✔
670
        i := interpretResult(&result.route.Val, result.failure.ValOpt())
72✔
671

72✔
672
        if i.policyFailure != nil {
81✔
673
                if m.state.requestSecondChance(
9✔
674
                        time.Unix(0, int64(result.timeReply.Val)),
9✔
675
                        i.policyFailure.From, i.policyFailure.To,
9✔
676
                ) {
15✔
677
                        return nil
6✔
678
                }
6✔
679
        }
680

681
        // If there is a node-level failure, record a failure for every tried
682
        // connection of that node. A node-level failure can be considered as a
683
        // failure that would have occurred with any of the node's channels.
684
        //
685
        // Ideally we'd also record the failure for the untried connections of
686
        // the node. Unfortunately this would require access to the graph and
687
        // adding this dependency and db calls does not outweigh the benefits.
688
        //
689
        // Untried connections will fall back to the node probability. After the
690
        // call to setAllPairResult below, the node probability will be equal to
691
        // the probability of the tried channels except that the a priori
692
        // probability is mixed in too. This effect is controlled by the
693
        // aprioriWeight parameter. If that parameter isn't set to an extreme
694
        // and there are a few known connections, there shouldn't be much of a
695
        // difference. The largest difference occurs when aprioriWeight is 1. In
696
        // that case, a node-level failure would not be applied to untried
697
        // channels.
698
        if i.nodeFailure != nil {
72✔
699
                m.log.Debugf("Reporting node failure to Mission Control: "+
6✔
700
                        "node=%v", *i.nodeFailure)
6✔
701

6✔
702
                m.state.setAllFail(
6✔
703
                        *i.nodeFailure,
6✔
704
                        time.Unix(0, int64(result.timeReply.Val)),
6✔
705
                )
6✔
706
        }
6✔
707

708
        for pair, pairResult := range i.pairResults {
212✔
709
                pairResult := pairResult
146✔
710

146✔
711
                if pairResult.success {
227✔
712
                        m.log.Debugf("Reporting pair success to Mission "+
81✔
713
                                "Control: pair=%v, amt=%v",
81✔
714
                                pair, pairResult.amt)
81✔
715
                } else {
146✔
716
                        m.log.Debugf("Reporting pair failure to Mission "+
65✔
717
                                "Control: pair=%v, amt=%v",
65✔
718
                                pair, pairResult.amt)
65✔
719
                }
65✔
720

721
                m.state.setLastPairResult(
146✔
722
                        pair.From, pair.To,
146✔
723
                        time.Unix(0, int64(result.timeReply.Val)), &pairResult,
146✔
724
                        false,
146✔
725
                )
146✔
726
        }
727

728
        return i.finalFailureReason
66✔
729
}
730

731
// namespacedDB is an implementation of the missionControlDB that gives a user
732
// of the interface access to a namespaced bucket within the top level mission
733
// control bucket.
734
type namespacedDB struct {
735
        topLevelBucketKey []byte
736
        namespace         []byte
737
        db                kvdb.Backend
738
}
739

740
// A compile-time check to ensure that namespacedDB implements missionControlDB.
741
var _ missionControlDB = (*namespacedDB)(nil)
742

743
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
744
// default namespace.
745
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
4✔
746
        return newNamespacedDB(db, DefaultMissionControlNamespace)
4✔
747
}
4✔
748

749
// newNamespacedDB creates a new instance of missionControlDB where the DB will
750
// have access to a namespaced bucket within the top level mission control
751
// bucket.
752
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
37✔
753
        return &namespacedDB{
37✔
754
                db:                db,
37✔
755
                namespace:         []byte(namespace),
37✔
756
                topLevelBucketKey: resultsKey,
37✔
757
        }
37✔
758
}
37✔
759

760
// update can be used to perform reads and writes on the given bucket.
761
//
762
// NOTE: this is part of the missionControlDB interface.
763
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
764
        reset func()) error {
46✔
765

46✔
766
        return n.db.Update(func(tx kvdb.RwTx) error {
92✔
767
                mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
46✔
768
                if err != nil {
46✔
769
                        return fmt.Errorf("cannot create top level mission "+
×
770
                                "control bucket: %w", err)
×
771
                }
×
772

773
                namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
46✔
774
                        n.namespace,
46✔
775
                )
46✔
776
                if err != nil {
46✔
777
                        return fmt.Errorf("cannot create namespaced bucket "+
×
778
                                "(%s) in mission control store: %w",
×
779
                                n.namespace, err)
×
780
                }
×
781

782
                return f(namespacedBkt)
46✔
783
        }, reset)
784
}
785

786
// view can be used to perform reads on the given bucket.
787
//
788
// NOTE: this is part of the missionControlDB interface.
789
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
790
        reset func()) error {
47✔
791

47✔
792
        return n.db.View(func(tx kvdb.RTx) error {
94✔
793
                mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
47✔
794
                if mcStoreBkt == nil {
47✔
795
                        return fmt.Errorf("top level mission control bucket " +
×
796
                                "not found")
×
797
                }
×
798

799
                namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
47✔
800
                if namespacedBkt == nil {
47✔
801
                        return fmt.Errorf("namespaced bucket (%s) not found "+
×
802
                                "in mission control store", n.namespace)
×
803
                }
×
804

805
                return f(namespacedBkt)
47✔
806
        }, reset)
807
}
808

809
// purge will delete all the contents in the namespace.
810
//
811
// NOTE: this is part of the missionControlDB interface.
812
func (n *namespacedDB) purge() error {
3✔
813
        return n.db.Update(func(tx kvdb.RwTx) error {
6✔
814
                mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
3✔
815
                if mcStoreBkt == nil {
3✔
816
                        return nil
×
817
                }
×
818

819
                err := mcStoreBkt.DeleteNestedBucket(n.namespace)
3✔
820
                if err != nil {
3✔
821
                        return err
×
822
                }
×
823

824
                _, err = mcStoreBkt.CreateBucket(n.namespace)
3✔
825

3✔
826
                return err
3✔
827
        }, func() {})
3✔
828
}
829

830
// paymentFailure represents the presence of a payment failure. It may or may
831
// not include additional information about said failure.
832
type paymentFailure struct {
833
        info tlv.OptionalRecordT[tlv.TlvType0, paymentFailureInfo]
834
}
835

836
// newPaymentFailure constructs a new paymentFailure struct. If the source
837
// index is nil, then an empty paymentFailure is returned. This represents a
838
// failure with unknown details. Otherwise, the index and failure message are
839
// used to populate the info field of the paymentFailure.
840
func newPaymentFailure(sourceIdx *int,
841
        failureMsg lnwire.FailureMessage) *paymentFailure {
77✔
842

77✔
843
        if sourceIdx == nil {
78✔
844
                return &paymentFailure{}
1✔
845
        }
1✔
846

847
        info := paymentFailureInfo{
76✔
848
                sourceIdx: tlv.NewPrimitiveRecord[tlv.TlvType0](
76✔
849
                        uint8(*sourceIdx),
76✔
850
                ),
76✔
851
                msg: tlv.NewRecordT[tlv.TlvType1](failureMessage{failureMsg}),
76✔
852
        }
76✔
853

76✔
854
        return &paymentFailure{
76✔
855
                info: tlv.SomeRecordT(tlv.NewRecordT[tlv.TlvType0](info)),
76✔
856
        }
76✔
857
}
858

859
// Record returns a TLV record that can be used to encode/decode a
860
// paymentFailure to/from a TLV stream.
861
func (r *paymentFailure) Record() tlv.Record {
34✔
862
        recordSize := func() uint64 {
45✔
863
                var (
11✔
864
                        b   bytes.Buffer
11✔
865
                        buf [8]byte
11✔
866
                )
11✔
867
                if err := encodePaymentFailure(&b, r, &buf); err != nil {
11✔
868
                        panic(err)
×
869
                }
870

871
                return uint64(len(b.Bytes()))
11✔
872
        }
873

874
        return tlv.MakeDynamicRecord(
34✔
875
                0, r, recordSize, encodePaymentFailure, decodePaymentFailure,
34✔
876
        )
34✔
877
}
878

879
func encodePaymentFailure(w io.Writer, val interface{}, _ *[8]byte) error {
22✔
880
        if v, ok := val.(*paymentFailure); ok {
44✔
881
                var recordProducers []tlv.RecordProducer
22✔
882
                v.info.WhenSome(
22✔
883
                        func(r tlv.RecordT[tlv.TlvType0, paymentFailureInfo]) {
44✔
884
                                recordProducers = append(recordProducers, &r)
22✔
885
                        },
22✔
886
                )
887

888
                return lnwire.EncodeRecordsTo(
22✔
889
                        w, lnwire.ProduceRecordsSorted(recordProducers...),
22✔
890
                )
22✔
891
        }
892

893
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailure")
×
894
}
895

896
func decodePaymentFailure(r io.Reader, val interface{}, _ *[8]byte,
897
        l uint64) error {
21✔
898

21✔
899
        if v, ok := val.(*paymentFailure); ok {
42✔
900
                var h paymentFailure
21✔
901

21✔
902
                info := tlv.ZeroRecordT[tlv.TlvType0, paymentFailureInfo]()
21✔
903
                typeMap, err := lnwire.DecodeRecords(
21✔
904
                        r, lnwire.ProduceRecordsSorted(&info)...,
21✔
905
                )
21✔
906
                if err != nil {
21✔
907
                        return err
×
908
                }
×
909

910
                if _, ok := typeMap[h.info.TlvType()]; ok {
42✔
911
                        h.info = tlv.SomeRecordT(info)
21✔
912
                }
21✔
913

914
                *v = h
21✔
915

21✔
916
                return nil
21✔
917
        }
918

919
        return tlv.NewTypeForDecodingErr(val, "routing.paymentFailure", l, l)
×
920
}
921

922
// paymentFailureInfo holds additional information about a payment failure.
923
type paymentFailureInfo struct {
924
        sourceIdx tlv.RecordT[tlv.TlvType0, uint8]
925
        msg       tlv.RecordT[tlv.TlvType1, failureMessage]
926
}
927

928
// Record returns a TLV record that can be used to encode/decode a
929
// paymentFailureInfo to/from a TLV stream.
930
func (r *paymentFailureInfo) Record() tlv.Record {
43✔
931
        recordSize := func() uint64 {
65✔
932
                var (
22✔
933
                        b   bytes.Buffer
22✔
934
                        buf [8]byte
22✔
935
                )
22✔
936
                if err := encodePaymentFailureInfo(&b, r, &buf); err != nil {
22✔
937
                        panic(err)
×
938
                }
939

940
                return uint64(len(b.Bytes()))
22✔
941
        }
942

943
        return tlv.MakeDynamicRecord(
43✔
944
                0, r, recordSize, encodePaymentFailureInfo,
43✔
945
                decodePaymentFailureInfo,
43✔
946
        )
43✔
947
}
948

949
func encodePaymentFailureInfo(w io.Writer, val interface{}, _ *[8]byte) error {
44✔
950
        if v, ok := val.(*paymentFailureInfo); ok {
88✔
951
                return lnwire.EncodeRecordsTo(
44✔
952
                        w, lnwire.ProduceRecordsSorted(
44✔
953
                                &v.sourceIdx, &v.msg,
44✔
954
                        ),
44✔
955
                )
44✔
956
        }
44✔
957

958
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailureInfo")
×
959
}
960

961
func decodePaymentFailureInfo(r io.Reader, val interface{}, _ *[8]byte,
962
        l uint64) error {
21✔
963

21✔
964
        if v, ok := val.(*paymentFailureInfo); ok {
42✔
965
                var h paymentFailureInfo
21✔
966

21✔
967
                _, err := lnwire.DecodeRecords(
21✔
968
                        r,
21✔
969
                        lnwire.ProduceRecordsSorted(&h.sourceIdx, &h.msg)...,
21✔
970
                )
21✔
971
                if err != nil {
21✔
972
                        return err
×
973
                }
×
974

975
                *v = h
21✔
976

21✔
977
                return nil
21✔
978
        }
979

980
        return tlv.NewTypeForDecodingErr(
×
981
                val, "routing.paymentFailureInfo", l, l,
×
982
        )
×
983
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc