• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12115442155

02 Dec 2024 08:28AM UTC coverage: 48.662% (-10.3%) from 58.948%
12115442155

Pull #9175

github

ellemouton
netann: update ChanAnn2 validation to work for P2WSH channels

This commit expands the ChannelAnnouncement2 validation for the case
where it is announcing a P2WSH channel.
Pull Request #9175: lnwire+netann: update structure of g175 messages to be pure TLV

6 of 314 new or added lines in 9 files covered. (1.91%)

27532 existing lines in 434 files now uncovered.

97890 of 201164 relevant lines covered (48.66%)

0.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.52
/routing/missioncontrol.go
1
package routing
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btclog/v2"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/channeldb"
15
        "github.com/lightningnetwork/lnd/clock"
16
        "github.com/lightningnetwork/lnd/fn"
17
        "github.com/lightningnetwork/lnd/kvdb"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/routing/route"
20
        "github.com/lightningnetwork/lnd/tlv"
21
)
22

23
const (
24
        // DefaultPenaltyHalfLife is the default half-life duration. The
25
        // half-life duration defines after how much time a penalized node or
26
        // channel is back at 50% probability.
27
        DefaultPenaltyHalfLife = time.Hour
28

29
        // minSecondChanceInterval is the minimum time required between
30
        // second-chance failures.
31
        //
32
        // If nodes return a channel policy related failure, they may get a
33
        // second chance to forward the payment. It could be that the channel
34
        // policy that we are aware of is not up to date. This is especially
35
        // important in case of mobile apps that are mostly offline.
36
        //
37
        // However, we don't want to give nodes the option to endlessly return
38
        // new channel updates so that we are kept busy trying to route through
39
        // that node until the payment loop times out.
40
        //
41
        // Therefore we only grant a second chance to a node if the previous
42
        // second chance is sufficiently long ago. This is what
43
        // minSecondChanceInterval defines. If a second policy failure comes in
44
        // within that interval, we will apply a penalty.
45
        //
46
        // Second chances granted are tracked on the level of node pairs. This
47
        // means that if a node has multiple channels to the same peer, they
48
        // will only get a single second chance to route to that peer again.
49
        // Nodes forward non-strict, so it isn't necessary to apply a less
50
        // restrictive channel level tracking scheme here.
51
        minSecondChanceInterval = time.Minute
52

53
        // DefaultMaxMcHistory is the default maximum history size.
54
        DefaultMaxMcHistory = 1000
55

56
        // DefaultMcFlushInterval is the default interval we use to flush MC state
57
        // to the database.
58
        DefaultMcFlushInterval = time.Second
59

60
        // prevSuccessProbability is the assumed probability for node pairs that
61
        // successfully relayed the previous attempt.
62
        prevSuccessProbability = 0.95
63

64
        // DefaultAprioriWeight is the default a priori weight. See
65
        // MissionControlConfig for further explanation.
66
        DefaultAprioriWeight = 0.5
67

68
        // DefaultMinFailureRelaxInterval is the default minimum time that must
69
        // have passed since the previously recorded failure before the failure
70
        // amount may be raised.
71
        DefaultMinFailureRelaxInterval = time.Minute
72

73
        // DefaultFeeEstimationTimeout is the default value for
74
        // FeeEstimationTimeout. It defines the maximum duration that the
75
        // probing fee estimation is allowed to take.
76
        DefaultFeeEstimationTimeout = time.Minute
77

78
        // DefaultMissionControlNamespace is the name of the default mission
79
        // control name space. This is used as the sub-bucket key within the
80
        // top level DB bucket to store mission control results.
81
        DefaultMissionControlNamespace = "default"
82
)
83

84
var (
85
        // ErrInvalidMcHistory is returned if we get a negative mission control
86
        // history count.
87
        ErrInvalidMcHistory = errors.New("mission control history must be " +
88
                ">= 0")
89

90
        // ErrInvalidFailureInterval is returned if we get an invalid failure
91
        // interval.
92
        ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
93
)
94

95
// NodeResults contains previous results from a node to its peers.
96
type NodeResults map[route.Vertex]TimedPairResult
97

98
// mcConfig holds various config members that will be required by all
99
// MissionControl instances and will be the same regardless of namespace.
100
type mcConfig struct {
101
        // clock is a time source used by mission control.
102
        clock clock.Clock
103

104
        // selfNode is our pubkey.
105
        selfNode route.Vertex
106
}
107

108
// MissionControl contains state which summarizes the past attempts of HTLC
109
// routing by external callers when sending payments throughout the network. It
110
// acts as a shared memory during routing attempts with the goal to optimize the
111
// payment attempt success rate.
112
//
113
// Failed payment attempts are reported to mission control. These reports are
114
// used to track the time of the last node or channel level failure. The time
115
// since the last failure is used to estimate a success probability that is fed
116
// into the path finding process for subsequent payment attempts.
117
type MissionControl struct {
118
        cfg *mcConfig
119

120
        // state is the internal mission control state that is input for
121
        // probability estimation.
122
        state *missionControlState
123

124
        store *missionControlStore
125

126
        // estimator is the probability estimator that is used with the payment
127
        // results that mission control collects.
128
        estimator Estimator
129

130
        // onConfigUpdate is a function that is called whenever the
131
        // mission control state is updated.
132
        onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
133

134
        log btclog.Logger
135

136
        mu sync.Mutex
137
}
138

139
// MissionController manages MissionControl instances in various namespaces.
140
type MissionController struct {
141
        db           kvdb.Backend
142
        cfg          *mcConfig
143
        defaultMCCfg *MissionControlConfig
144

145
        mc map[string]*MissionControl
146
        mu sync.Mutex
147

148
        // TODO(roasbeef): further counters, if vertex continually unavailable,
149
        // add to another generation
150

151
        // TODO(roasbeef): also add favorable metrics for nodes
152
}
153

154
// GetNamespacedStore returns the MissionControl in the given namespace. If one
155
// does not yet exist, then it is initialised.
156
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
157
        error) {
1✔
158

1✔
159
        m.mu.Lock()
1✔
160
        defer m.mu.Unlock()
1✔
161

1✔
162
        if mc, ok := m.mc[ns]; ok {
2✔
163
                return mc, nil
1✔
164
        }
1✔
165

UNCOV
166
        return m.initMissionControl(ns)
×
167
}
168

169
// ListNamespaces returns a list of the namespaces that the MissionController
170
// is aware of.
UNCOV
171
func (m *MissionController) ListNamespaces() []string {
×
UNCOV
172
        m.mu.Lock()
×
UNCOV
173
        defer m.mu.Unlock()
×
UNCOV
174

×
UNCOV
175
        namespaces := make([]string, 0, len(m.mc))
×
UNCOV
176
        for ns := range m.mc {
×
UNCOV
177
                namespaces = append(namespaces, ns)
×
UNCOV
178
        }
×
179

UNCOV
180
        return namespaces
×
181
}
182

183
// MissionControlConfig defines parameters that control mission control
184
// behaviour.
185
type MissionControlConfig struct {
186
        // Estimator gives probability estimates for node pairs.
187
        Estimator Estimator
188

189
        // OnConfigUpdate is function that is called whenever the
190
        // mission control state is updated.
191
        OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
192

193
        // MaxMcHistory defines the maximum number of payment results that are
194
        // held on disk.
195
        MaxMcHistory int
196

197
        // McFlushInterval defines the ticker interval when we flush the
198
        // accumulated state to the DB.
199
        McFlushInterval time.Duration
200

201
        // MinFailureRelaxInterval is the minimum time that must have passed
202
        // since the previously recorded failure before the failure amount may
203
        // be raised.
204
        MinFailureRelaxInterval time.Duration
205
}
206

207
func (c *MissionControlConfig) validate() error {
1✔
208
        if c.MaxMcHistory < 0 {
1✔
209
                return ErrInvalidMcHistory
×
210
        }
×
211

212
        if c.MinFailureRelaxInterval < 0 {
1✔
213
                return ErrInvalidFailureInterval
×
214
        }
×
215

216
        return nil
1✔
217
}
218

219
// String returns a string representation of a mission control config.
220
func (c *MissionControlConfig) String() string {
1✔
221
        return fmt.Sprintf("maximum history: %v, minimum failure relax "+
1✔
222
                "interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
1✔
223
}
1✔
224

225
// TimedPairResult describes a timestamped pair result.
226
type TimedPairResult struct {
227
        // FailTime is the time of the last failure.
228
        FailTime time.Time
229

230
        // FailAmt is the amount of the last failure. This amount may be pushed
231
        // up if a later success is higher than the last failed amount.
232
        FailAmt lnwire.MilliSatoshi
233

234
        // SuccessTime is the time of the last success.
235
        SuccessTime time.Time
236

237
        // SuccessAmt is the highest amount that successfully forwarded. This
238
        // isn't necessarily the last success amount. The value of this field
239
        // may also be pushed down if a later failure is lower than the highest
240
        // success amount. Because of this, SuccessAmt may not match
241
        // SuccessTime.
242
        SuccessAmt lnwire.MilliSatoshi
243
}
244

245
// MissionControlSnapshot contains a snapshot of the current state of mission
246
// control.
247
type MissionControlSnapshot struct {
248
        // Pairs is a list of channels for which specific information is
249
        // logged.
250
        Pairs []MissionControlPairSnapshot
251
}
252

253
// MissionControlPairSnapshot contains a snapshot of the current node pair
254
// state in mission control.
255
type MissionControlPairSnapshot struct {
256
        // Pair is the node pair of which the state is described.
257
        Pair DirectedNodePair
258

259
        // TimedPairResult contains the data for this pair.
260
        TimedPairResult
261
}
262

263
// paymentResult is the information that becomes available when a payment
264
// attempt completes.
265
type paymentResult struct {
266
        id        uint64
267
        timeFwd   tlv.RecordT[tlv.TlvType0, uint64]
268
        timeReply tlv.RecordT[tlv.TlvType1, uint64]
269
        route     tlv.RecordT[tlv.TlvType2, mcRoute]
270

271
        // failure holds information related to the failure of a payment. The
272
        // presence of this record indicates a payment failure. The absence of
273
        // this record indicates a successful payment.
274
        failure tlv.OptionalRecordT[tlv.TlvType3, paymentFailure]
275
}
276

277
// newPaymentResult constructs a new paymentResult.
278
func newPaymentResult(id uint64, rt *mcRoute, timeFwd, timeReply time.Time,
279
        failure *paymentFailure) *paymentResult {
1✔
280

1✔
281
        result := &paymentResult{
1✔
282
                id: id,
1✔
283
                timeFwd: tlv.NewPrimitiveRecord[tlv.TlvType0](
1✔
284
                        uint64(timeFwd.UnixNano()),
1✔
285
                ),
1✔
286
                timeReply: tlv.NewPrimitiveRecord[tlv.TlvType1](
1✔
287
                        uint64(timeReply.UnixNano()),
1✔
288
                ),
1✔
289
                route: tlv.NewRecordT[tlv.TlvType2](*rt),
1✔
290
        }
1✔
291

1✔
292
        if failure != nil {
2✔
293
                result.failure = tlv.SomeRecordT(
1✔
294
                        tlv.NewRecordT[tlv.TlvType3](*failure),
1✔
295
                )
1✔
296
        }
1✔
297

298
        return result
1✔
299
}
300

301
// NewMissionController returns a new instance of MissionController.
302
func NewMissionController(db kvdb.Backend, self route.Vertex,
303
        cfg *MissionControlConfig) (*MissionController, error) {
1✔
304

1✔
305
        log.Debugf("Instantiating mission control with config: %v, %v", cfg,
1✔
306
                cfg.Estimator)
1✔
307

1✔
308
        if err := cfg.validate(); err != nil {
1✔
309
                return nil, err
×
310
        }
×
311

312
        mcCfg := &mcConfig{
1✔
313
                clock:    clock.NewDefaultClock(),
1✔
314
                selfNode: self,
1✔
315
        }
1✔
316

1✔
317
        mgr := &MissionController{
1✔
318
                db:           db,
1✔
319
                defaultMCCfg: cfg,
1✔
320
                cfg:          mcCfg,
1✔
321
                mc:           make(map[string]*MissionControl),
1✔
322
        }
1✔
323

1✔
324
        if err := mgr.loadMissionControls(); err != nil {
1✔
325
                return nil, err
×
326
        }
×
327

328
        for _, mc := range mgr.mc {
2✔
329
                if err := mc.init(); err != nil {
1✔
330
                        return nil, err
×
331
                }
×
332
        }
333

334
        return mgr, nil
1✔
335
}
336

337
// loadMissionControls initialises a MissionControl in the default namespace if
338
// one does not yet exist. It then initialises a MissionControl for all other
339
// namespaces found in the DB.
340
//
341
// NOTE: this should only be called once during MissionController construction.
342
func (m *MissionController) loadMissionControls() error {
1✔
343
        m.mu.Lock()
1✔
344
        defer m.mu.Unlock()
1✔
345

1✔
346
        // Always initialise the default namespace.
1✔
347
        _, err := m.initMissionControl(DefaultMissionControlNamespace)
1✔
348
        if err != nil {
1✔
349
                return err
×
350
        }
×
351

352
        namespaces := make(map[string]struct{})
1✔
353
        err = m.db.View(func(tx walletdb.ReadTx) error {
2✔
354
                mcStoreBkt := tx.ReadBucket(resultsKey)
1✔
355
                if mcStoreBkt == nil {
1✔
356
                        return fmt.Errorf("top level mission control bucket " +
×
357
                                "not found")
×
358
                }
×
359

360
                // Iterate through all the keys in the bucket and collect the
361
                // namespaces.
362
                return mcStoreBkt.ForEach(func(k, _ []byte) error {
2✔
363
                        // We've already initialised the default namespace so
1✔
364
                        // we can skip it.
1✔
365
                        if string(k) == DefaultMissionControlNamespace {
2✔
366
                                return nil
1✔
367
                        }
1✔
368

UNCOV
369
                        namespaces[string(k)] = struct{}{}
×
UNCOV
370

×
UNCOV
371
                        return nil
×
372
                })
373
        }, func() {})
1✔
374
        if err != nil {
1✔
375
                return err
×
376
        }
×
377

378
        // Now, iterate through all the namespaces and initialise them.
379
        for ns := range namespaces {
1✔
UNCOV
380
                _, err = m.initMissionControl(ns)
×
UNCOV
381
                if err != nil {
×
382
                        return err
×
383
                }
×
384
        }
385

386
        return nil
1✔
387
}
388

389
// initMissionControl creates a new MissionControl instance with the given
390
// namespace if one does not yet exist.
391
//
392
// NOTE: the MissionController's mutex must be held before calling this method.
393
func (m *MissionController) initMissionControl(namespace string) (
394
        *MissionControl, error) {
1✔
395

1✔
396
        // If a mission control with this namespace has already been initialised
1✔
397
        // then there is nothing left to do.
1✔
398
        if mc, ok := m.mc[namespace]; ok {
1✔
399
                return mc, nil
×
400
        }
×
401

402
        cfg := m.defaultMCCfg
1✔
403

1✔
404
        store, err := newMissionControlStore(
1✔
405
                newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
1✔
406
                cfg.McFlushInterval,
1✔
407
        )
1✔
408
        if err != nil {
1✔
409
                return nil, err
×
410
        }
×
411

412
        mc := &MissionControl{
1✔
413
                cfg: m.cfg,
1✔
414
                state: newMissionControlState(
1✔
415
                        cfg.MinFailureRelaxInterval,
1✔
416
                ),
1✔
417
                store:          store,
1✔
418
                estimator:      cfg.Estimator,
1✔
419
                log:            log.WithPrefix(fmt.Sprintf("[%s]:", namespace)),
1✔
420
                onConfigUpdate: cfg.OnConfigUpdate,
1✔
421
        }
1✔
422

1✔
423
        m.mc[namespace] = mc
1✔
424

1✔
425
        return mc, nil
1✔
426
}
427

428
// RunStoreTickers runs the mission controller store's tickers.
429
func (m *MissionController) RunStoreTickers() {
1✔
430
        m.mu.Lock()
1✔
431
        defer m.mu.Unlock()
1✔
432

1✔
433
        for _, mc := range m.mc {
2✔
434
                mc.store.run()
1✔
435
        }
1✔
436
}
437

438
// StopStoreTickers stops the mission control store's tickers.
439
func (m *MissionController) StopStoreTickers() {
1✔
440
        log.Debug("Stopping mission control store ticker")
1✔
441
        defer log.Debug("Mission control store ticker stopped")
1✔
442

1✔
443
        m.mu.Lock()
1✔
444
        defer m.mu.Unlock()
1✔
445

1✔
446
        for _, mc := range m.mc {
2✔
447
                mc.store.stop()
1✔
448
        }
1✔
449
}
450

451
// init initializes mission control with historical data.
452
func (m *MissionControl) init() error {
1✔
453
        m.log.Debugf("Mission control state reconstruction started")
1✔
454

1✔
455
        m.mu.Lock()
1✔
456
        defer m.mu.Unlock()
1✔
457

1✔
458
        start := time.Now()
1✔
459

1✔
460
        results, err := m.store.fetchAll()
1✔
461
        if err != nil {
1✔
462
                return err
×
463
        }
×
464

465
        for _, result := range results {
2✔
466
                _ = m.applyPaymentResult(result)
1✔
467
        }
1✔
468

469
        m.log.Debugf("Mission control state reconstruction finished: "+
1✔
470
                "n=%v, time=%v", len(results), time.Since(start))
1✔
471

1✔
472
        return nil
1✔
473
}
474

475
// GetConfig returns the config that mission control is currently configured
476
// with. All fields are copied by value, so we do not need to worry about
477
// mutation.
478
func (m *MissionControl) GetConfig() *MissionControlConfig {
1✔
479
        m.mu.Lock()
1✔
480
        defer m.mu.Unlock()
1✔
481

1✔
482
        return &MissionControlConfig{
1✔
483
                Estimator:               m.estimator,
1✔
484
                MaxMcHistory:            m.store.maxRecords,
1✔
485
                McFlushInterval:         m.store.flushInterval,
1✔
486
                MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
1✔
487
        }
1✔
488
}
1✔
489

490
// SetConfig validates the config provided and updates mission control's config
491
// if it is valid.
492
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
1✔
493
        if cfg == nil {
1✔
494
                return errors.New("nil mission control config")
×
495
        }
×
496

497
        if err := cfg.validate(); err != nil {
1✔
498
                return err
×
499
        }
×
500

501
        m.mu.Lock()
1✔
502
        defer m.mu.Unlock()
1✔
503

1✔
504
        m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
1✔
505
                cfg.Estimator)
1✔
506

1✔
507
        m.store.maxRecords = cfg.MaxMcHistory
1✔
508
        m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
1✔
509
        m.estimator = cfg.Estimator
1✔
510

1✔
511
        // Execute the callback function if it is set.
1✔
512
        m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
2✔
513
                f(cfg)
1✔
514
        })
1✔
515

516
        return nil
1✔
517
}
518

519
// ResetHistory resets the history of MissionControl returning it to a state as
520
// if no payment attempts have been made.
521
func (m *MissionControl) ResetHistory() error {
1✔
522
        m.mu.Lock()
1✔
523
        defer m.mu.Unlock()
1✔
524

1✔
525
        if err := m.store.clear(); err != nil {
1✔
526
                return err
×
527
        }
×
528

529
        m.state.resetHistory()
1✔
530

1✔
531
        m.log.Debugf("Mission control history cleared")
1✔
532

1✔
533
        return nil
1✔
534
}
535

536
// GetProbability is expected to return the success probability of a payment
537
// from fromNode along edge.
538
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
539
        amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
1✔
540

1✔
541
        m.mu.Lock()
1✔
542
        defer m.mu.Unlock()
1✔
543

1✔
544
        now := m.cfg.clock.Now()
1✔
545
        results, _ := m.state.getLastPairResult(fromNode)
1✔
546

1✔
547
        // Use a distinct probability estimation function for local channels.
1✔
548
        if fromNode == m.cfg.selfNode {
2✔
549
                return m.estimator.LocalPairProbability(now, results, toNode)
1✔
550
        }
1✔
551

552
        return m.estimator.PairProbability(
1✔
553
                now, results, toNode, amt, capacity,
1✔
554
        )
1✔
555
}
556

557
// GetHistorySnapshot takes a snapshot from the current mission control state
558
// and actual probability estimates.
UNCOV
559
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
×
UNCOV
560
        m.mu.Lock()
×
UNCOV
561
        defer m.mu.Unlock()
×
UNCOV
562

×
UNCOV
563
        m.log.Debugf("Requesting history snapshot from mission control")
×
UNCOV
564

×
UNCOV
565
        return m.state.getSnapshot()
×
UNCOV
566
}
×
567

568
// ImportHistory imports the set of mission control results provided to our
569
// in-memory state. These results are not persisted, so will not survive
570
// restarts.
571
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
572
        force bool) error {
1✔
573

1✔
574
        if history == nil {
1✔
575
                return errors.New("cannot import nil history")
×
576
        }
×
577

578
        m.mu.Lock()
1✔
579
        defer m.mu.Unlock()
1✔
580

1✔
581
        m.log.Infof("Importing history snapshot with %v pairs to mission "+
1✔
582
                "control", len(history.Pairs))
1✔
583

1✔
584
        imported := m.state.importSnapshot(history, force)
1✔
585

1✔
586
        m.log.Infof("Imported %v results to mission control", imported)
1✔
587

1✔
588
        return nil
1✔
589
}
590

591
// GetPairHistorySnapshot returns the stored history for a given node pair.
592
func (m *MissionControl) GetPairHistorySnapshot(
593
        fromNode, toNode route.Vertex) TimedPairResult {
1✔
594

1✔
595
        m.mu.Lock()
1✔
596
        defer m.mu.Unlock()
1✔
597

1✔
598
        results, ok := m.state.getLastPairResult(fromNode)
1✔
599
        if !ok {
2✔
600
                return TimedPairResult{}
1✔
601
        }
1✔
602

603
        result, ok := results[toNode]
1✔
604
        if !ok {
2✔
605
                return TimedPairResult{}
1✔
606
        }
1✔
607

608
        return result
1✔
609
}
610

611
// ReportPaymentFail reports a failed payment to mission control as input for
612
// future probability estimates. The failureSourceIdx argument indicates the
613
// failure source. If it is nil, the failure source is unknown. This function
614
// returns a reason if this failure is a final failure. In that case no further
615
// payment attempts need to be made.
616
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
617
        failureSourceIdx *int, failure lnwire.FailureMessage) (
618
        *channeldb.FailureReason, error) {
1✔
619

1✔
620
        timestamp := m.cfg.clock.Now()
1✔
621

1✔
622
        result := newPaymentResult(
1✔
623
                paymentID, extractMCRoute(rt), timestamp, timestamp,
1✔
624
                newPaymentFailure(failureSourceIdx, failure),
1✔
625
        )
1✔
626

1✔
627
        return m.processPaymentResult(result)
1✔
628
}
1✔
629

630
// ReportPaymentSuccess reports a successful payment to mission control as input
631
// for future probability estimates.
632
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
633
        rt *route.Route) error {
1✔
634

1✔
635
        timestamp := m.cfg.clock.Now()
1✔
636

1✔
637
        result := newPaymentResult(
1✔
638
                paymentID, extractMCRoute(rt), timestamp, timestamp, nil,
1✔
639
        )
1✔
640

1✔
641
        _, err := m.processPaymentResult(result)
1✔
642

1✔
643
        return err
1✔
644
}
1✔
645

646
// processPaymentResult stores a payment result in the mission control store and
647
// updates mission control's in-memory state.
648
func (m *MissionControl) processPaymentResult(result *paymentResult) (
649
        *channeldb.FailureReason, error) {
1✔
650

1✔
651
        // Store complete result in database.
1✔
652
        m.store.AddResult(result)
1✔
653

1✔
654
        m.mu.Lock()
1✔
655
        defer m.mu.Unlock()
1✔
656

1✔
657
        // Apply result to update mission control state.
1✔
658
        reason := m.applyPaymentResult(result)
1✔
659

1✔
660
        return reason, nil
1✔
661
}
1✔
662

663
// applyPaymentResult applies a payment result as input for future probability
664
// estimates. It returns a bool indicating whether this error is a final error
665
// and no further payment attempts need to be made.
666
func (m *MissionControl) applyPaymentResult(
667
        result *paymentResult) *channeldb.FailureReason {
1✔
668

1✔
669
        // Interpret result.
1✔
670
        i := interpretResult(&result.route.Val, result.failure.ValOpt())
1✔
671

1✔
672
        if i.policyFailure != nil {
2✔
673
                if m.state.requestSecondChance(
1✔
674
                        time.Unix(0, int64(result.timeReply.Val)),
1✔
675
                        i.policyFailure.From, i.policyFailure.To,
1✔
676
                ) {
2✔
677
                        return nil
1✔
678
                }
1✔
679
        }
680

681
        // If there is a node-level failure, record a failure for every tried
682
        // connection of that node. A node-level failure can be considered as a
683
        // failure that would have occurred with any of the node's channels.
684
        //
685
        // Ideally we'd also record the failure for the untried connections of
686
        // the node. Unfortunately this would require access to the graph and
687
        // adding this dependency and db calls does not outweigh the benefits.
688
        //
689
        // Untried connections will fall back to the node probability. After the
690
        // call to setAllPairResult below, the node probability will be equal to
691
        // the probability of the tried channels except that the a priori
692
        // probability is mixed in too. This effect is controlled by the
693
        // aprioriWeight parameter. If that parameter isn't set to an extreme
694
        // and there are a few known connections, there shouldn't be much of a
695
        // difference. The largest difference occurs when aprioriWeight is 1. In
696
        // that case, a node-level failure would not be applied to untried
697
        // channels.
698
        if i.nodeFailure != nil {
1✔
UNCOV
699
                m.log.Debugf("Reporting node failure to Mission Control: "+
×
UNCOV
700
                        "node=%v", *i.nodeFailure)
×
UNCOV
701

×
UNCOV
702
                m.state.setAllFail(
×
UNCOV
703
                        *i.nodeFailure,
×
UNCOV
704
                        time.Unix(0, int64(result.timeReply.Val)),
×
UNCOV
705
                )
×
UNCOV
706
        }
×
707

708
        for pair, pairResult := range i.pairResults {
2✔
709
                pairResult := pairResult
1✔
710

1✔
711
                if pairResult.success {
2✔
712
                        m.log.Debugf("Reporting pair success to Mission "+
1✔
713
                                "Control: pair=%v, amt=%v",
1✔
714
                                pair, pairResult.amt)
1✔
715
                } else {
2✔
716
                        m.log.Debugf("Reporting pair failure to Mission "+
1✔
717
                                "Control: pair=%v, amt=%v",
1✔
718
                                pair, pairResult.amt)
1✔
719
                }
1✔
720

721
                m.state.setLastPairResult(
1✔
722
                        pair.From, pair.To,
1✔
723
                        time.Unix(0, int64(result.timeReply.Val)), &pairResult,
1✔
724
                        false,
1✔
725
                )
1✔
726
        }
727

728
        return i.finalFailureReason
1✔
729
}
730

731
// namespacedDB is an implementation of the missionControlDB that gives a user
732
// of the interface access to a namespaced bucket within the top level mission
733
// control bucket.
734
type namespacedDB struct {
735
        topLevelBucketKey []byte
736
        namespace         []byte
737
        db                kvdb.Backend
738
}
739

740
// A compile-time check to ensure that namespacedDB implements missionControlDB.
741
var _ missionControlDB = (*namespacedDB)(nil)
742

743
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
744
// default namespace.
UNCOV
745
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
×
UNCOV
746
        return newNamespacedDB(db, DefaultMissionControlNamespace)
×
UNCOV
747
}
×
748

749
// newNamespacedDB creates a new instance of missionControlDB where the DB will
750
// have access to a namespaced bucket within the top level mission control
751
// bucket.
752
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
1✔
753
        return &namespacedDB{
1✔
754
                db:                db,
1✔
755
                namespace:         []byte(namespace),
1✔
756
                topLevelBucketKey: resultsKey,
1✔
757
        }
1✔
758
}
1✔
759

760
// update can be used to perform reads and writes on the given bucket.
761
//
762
// NOTE: this is part of the missionControlDB interface.
763
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
764
        reset func()) error {
1✔
765

1✔
766
        return n.db.Update(func(tx kvdb.RwTx) error {
2✔
767
                mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
1✔
768
                if err != nil {
1✔
769
                        return fmt.Errorf("cannot create top level mission "+
×
770
                                "control bucket: %w", err)
×
771
                }
×
772

773
                namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
1✔
774
                        n.namespace,
1✔
775
                )
1✔
776
                if err != nil {
1✔
777
                        return fmt.Errorf("cannot create namespaced bucket "+
×
778
                                "(%s) in mission control store: %w",
×
779
                                n.namespace, err)
×
780
                }
×
781

782
                return f(namespacedBkt)
1✔
783
        }, reset)
784
}
785

786
// view can be used to perform reads on the given bucket.
787
//
788
// NOTE: this is part of the missionControlDB interface.
789
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
790
        reset func()) error {
1✔
791

1✔
792
        return n.db.View(func(tx kvdb.RTx) error {
2✔
793
                mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
1✔
794
                if mcStoreBkt == nil {
1✔
795
                        return fmt.Errorf("top level mission control bucket " +
×
796
                                "not found")
×
797
                }
×
798

799
                namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
1✔
800
                if namespacedBkt == nil {
1✔
801
                        return fmt.Errorf("namespaced bucket (%s) not found "+
×
802
                                "in mission control store", n.namespace)
×
803
                }
×
804

805
                return f(namespacedBkt)
1✔
806
        }, reset)
807
}
808

809
// purge will delete all the contents in the namespace.
810
//
811
// NOTE: this is part of the missionControlDB interface.
812
func (n *namespacedDB) purge() error {
1✔
813
        return n.db.Update(func(tx kvdb.RwTx) error {
2✔
814
                mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
1✔
815
                if mcStoreBkt == nil {
1✔
816
                        return nil
×
817
                }
×
818

819
                err := mcStoreBkt.DeleteNestedBucket(n.namespace)
1✔
820
                if err != nil {
1✔
821
                        return err
×
822
                }
×
823

824
                _, err = mcStoreBkt.CreateBucket(n.namespace)
1✔
825

1✔
826
                return err
1✔
827
        }, func() {})
1✔
828
}
829

830
// paymentFailure represents the presence of a payment failure. It may or may
831
// not include additional information about said failure.
832
type paymentFailure struct {
833
        info tlv.OptionalRecordT[tlv.TlvType0, paymentFailureInfo]
834
}
835

836
// newPaymentFailure constructs a new paymentFailure struct. If the source
837
// index is nil, then an empty paymentFailure is returned. This represents a
838
// failure with unknown details. Otherwise, the index and failure message are
839
// used to populate the info field of the paymentFailure.
840
func newPaymentFailure(sourceIdx *int,
841
        failureMsg lnwire.FailureMessage) *paymentFailure {
1✔
842

1✔
843
        if sourceIdx == nil {
1✔
UNCOV
844
                return &paymentFailure{}
×
UNCOV
845
        }
×
846

847
        info := paymentFailureInfo{
1✔
848
                sourceIdx: tlv.NewPrimitiveRecord[tlv.TlvType0](
1✔
849
                        uint8(*sourceIdx),
1✔
850
                ),
1✔
851
                msg: tlv.NewRecordT[tlv.TlvType1](failureMessage{failureMsg}),
1✔
852
        }
1✔
853

1✔
854
        return &paymentFailure{
1✔
855
                info: tlv.SomeRecordT(tlv.NewRecordT[tlv.TlvType0](info)),
1✔
856
        }
1✔
857
}
858

859
// Record returns a TLV record that can be used to encode/decode a
860
// paymentFailure to/from a TLV stream.
861
func (r *paymentFailure) Record() tlv.Record {
1✔
862
        recordSize := func() uint64 {
2✔
863
                var (
1✔
864
                        b   bytes.Buffer
1✔
865
                        buf [8]byte
1✔
866
                )
1✔
867
                if err := encodePaymentFailure(&b, r, &buf); err != nil {
1✔
868
                        panic(err)
×
869
                }
870

871
                return uint64(len(b.Bytes()))
1✔
872
        }
873

874
        return tlv.MakeDynamicRecord(
1✔
875
                0, r, recordSize, encodePaymentFailure, decodePaymentFailure,
1✔
876
        )
1✔
877
}
878

879
func encodePaymentFailure(w io.Writer, val interface{}, _ *[8]byte) error {
1✔
880
        if v, ok := val.(*paymentFailure); ok {
2✔
881
                var recordProducers []tlv.RecordProducer
1✔
882
                v.info.WhenSome(
1✔
883
                        func(r tlv.RecordT[tlv.TlvType0, paymentFailureInfo]) {
2✔
884
                                recordProducers = append(recordProducers, &r)
1✔
885
                        },
1✔
886
                )
887

888
                return lnwire.EncodeRecordsTo(
1✔
889
                        w, lnwire.ProduceRecordsSorted(recordProducers...),
1✔
890
                )
1✔
891
        }
892

893
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailure")
×
894
}
895

896
func decodePaymentFailure(r io.Reader, val interface{}, _ *[8]byte,
897
        l uint64) error {
1✔
898

1✔
899
        if v, ok := val.(*paymentFailure); ok {
2✔
900
                var h paymentFailure
1✔
901

1✔
902
                info := tlv.ZeroRecordT[tlv.TlvType0, paymentFailureInfo]()
1✔
903
                typeMap, err := lnwire.DecodeRecords(
1✔
904
                        r, lnwire.ProduceRecordsSorted(&info)...,
1✔
905
                )
1✔
906
                if err != nil {
1✔
907
                        return err
×
908
                }
×
909

910
                if _, ok := typeMap[h.info.TlvType()]; ok {
2✔
911
                        h.info = tlv.SomeRecordT(info)
1✔
912
                }
1✔
913

914
                *v = h
1✔
915

1✔
916
                return nil
1✔
917
        }
918

919
        return tlv.NewTypeForDecodingErr(val, "routing.paymentFailure", l, l)
×
920
}
921

922
// paymentFailureInfo holds additional information about a payment failure.
923
type paymentFailureInfo struct {
924
        sourceIdx tlv.RecordT[tlv.TlvType0, uint8]
925
        msg       tlv.RecordT[tlv.TlvType1, failureMessage]
926
}
927

928
// Record returns a TLV record that can be used to encode/decode a
929
// paymentFailureInfo to/from a TLV stream.
930
func (r *paymentFailureInfo) Record() tlv.Record {
1✔
931
        recordSize := func() uint64 {
2✔
932
                var (
1✔
933
                        b   bytes.Buffer
1✔
934
                        buf [8]byte
1✔
935
                )
1✔
936
                if err := encodePaymentFailureInfo(&b, r, &buf); err != nil {
1✔
937
                        panic(err)
×
938
                }
939

940
                return uint64(len(b.Bytes()))
1✔
941
        }
942

943
        return tlv.MakeDynamicRecord(
1✔
944
                0, r, recordSize, encodePaymentFailureInfo,
1✔
945
                decodePaymentFailureInfo,
1✔
946
        )
1✔
947
}
948

949
func encodePaymentFailureInfo(w io.Writer, val interface{}, _ *[8]byte) error {
1✔
950
        if v, ok := val.(*paymentFailureInfo); ok {
2✔
951
                return lnwire.EncodeRecordsTo(
1✔
952
                        w, lnwire.ProduceRecordsSorted(
1✔
953
                                &v.sourceIdx, &v.msg,
1✔
954
                        ),
1✔
955
                )
1✔
956
        }
1✔
957

958
        return tlv.NewTypeForEncodingErr(val, "routing.paymentFailureInfo")
×
959
}
960

961
func decodePaymentFailureInfo(r io.Reader, val interface{}, _ *[8]byte,
962
        l uint64) error {
1✔
963

1✔
964
        if v, ok := val.(*paymentFailureInfo); ok {
2✔
965
                var h paymentFailureInfo
1✔
966

1✔
967
                _, err := lnwire.DecodeRecords(
1✔
968
                        r,
1✔
969
                        lnwire.ProduceRecordsSorted(&h.sourceIdx, &h.msg)...,
1✔
970
                )
1✔
971
                if err != nil {
1✔
972
                        return err
×
973
                }
×
974

975
                *v = h
1✔
976

1✔
977
                return nil
1✔
978
        }
979

980
        return tlv.NewTypeForDecodingErr(
×
981
                val, "routing.paymentFailureInfo", l, l,
×
982
        )
×
983
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc