• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.43
/routing/missioncontrol.go
1
package routing
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btclog"
11
        "github.com/btcsuite/btcwallet/walletdb"
12
        "github.com/lightningnetwork/lnd/build"
13
        "github.com/lightningnetwork/lnd/channeldb"
14
        "github.com/lightningnetwork/lnd/clock"
15
        "github.com/lightningnetwork/lnd/fn"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/routing/route"
19
)
20

21
const (
22
        // DefaultPenaltyHalfLife is the default half-life duration. The
23
        // half-life duration defines after how much time a penalized node or
24
        // channel is back at 50% probability.
25
        DefaultPenaltyHalfLife = time.Hour
26

27
        // minSecondChanceInterval is the minimum time required between
28
        // second-chance failures.
29
        //
30
        // If nodes return a channel policy related failure, they may get a
31
        // second chance to forward the payment. It could be that the channel
32
        // policy that we are aware of is not up to date. This is especially
33
        // important in case of mobile apps that are mostly offline.
34
        //
35
        // However, we don't want to give nodes the option to endlessly return
36
        // new channel updates so that we are kept busy trying to route through
37
        // that node until the payment loop times out.
38
        //
39
        // Therefore we only grant a second chance to a node if the previous
40
        // second chance is sufficiently long ago. This is what
41
        // minSecondChanceInterval defines. If a second policy failure comes in
42
        // within that interval, we will apply a penalty.
43
        //
44
        // Second chances granted are tracked on the level of node pairs. This
45
        // means that if a node has multiple channels to the same peer, they
46
        // will only get a single second chance to route to that peer again.
47
        // Nodes forward non-strict, so it isn't necessary to apply a less
48
        // restrictive channel level tracking scheme here.
49
        minSecondChanceInterval = time.Minute
50

51
        // DefaultMaxMcHistory is the default maximum history size.
52
        DefaultMaxMcHistory = 1000
53

54
        // DefaultMcFlushInterval is the default interval we use to flush MC state
55
        // to the database.
56
        DefaultMcFlushInterval = time.Second
57

58
        // prevSuccessProbability is the assumed probability for node pairs that
59
        // successfully relayed the previous attempt.
60
        prevSuccessProbability = 0.95
61

62
        // DefaultAprioriWeight is the default a priori weight. See
63
        // MissionControlConfig for further explanation.
64
        DefaultAprioriWeight = 0.5
65

66
        // DefaultMinFailureRelaxInterval is the default minimum time that must
67
        // have passed since the previously recorded failure before the failure
68
        // amount may be raised.
69
        DefaultMinFailureRelaxInterval = time.Minute
70

71
        // DefaultFeeEstimationTimeout is the default value for
72
        // FeeEstimationTimeout. It defines the maximum duration that the
73
        // probing fee estimation is allowed to take.
74
        DefaultFeeEstimationTimeout = time.Minute
75

76
        // DefaultMissionControlNamespace is the name of the default mission
77
        // control name space. This is used as the sub-bucket key within the
78
        // top level DB bucket to store mission control results.
79
        DefaultMissionControlNamespace = "default"
80
)
81

82
var (
83
        // ErrInvalidMcHistory is returned if we get a negative mission control
84
        // history count.
85
        ErrInvalidMcHistory = errors.New("mission control history must be " +
86
                ">= 0")
87

88
        // ErrInvalidFailureInterval is returned if we get an invalid failure
89
        // interval.
90
        ErrInvalidFailureInterval = errors.New("failure interval must be >= 0")
91
)
92

93
// NodeResults contains previous results from a node to its peers.
94
type NodeResults map[route.Vertex]TimedPairResult
95

96
// mcConfig holds various config members that will be required by all
97
// MissionControl instances and will be the same regardless of namespace.
98
type mcConfig struct {
99
        // clock is a time source used by mission control.
100
        clock clock.Clock
101

102
        // selfNode is our pubkey.
103
        selfNode route.Vertex
104
}
105

106
// MissionControl contains state which summarizes the past attempts of HTLC
107
// routing by external callers when sending payments throughout the network. It
108
// acts as a shared memory during routing attempts with the goal to optimize the
109
// payment attempt success rate.
110
//
111
// Failed payment attempts are reported to mission control. These reports are
112
// used to track the time of the last node or channel level failure. The time
113
// since the last failure is used to estimate a success probability that is fed
114
// into the path finding process for subsequent payment attempts.
115
type MissionControl struct {
116
        cfg *mcConfig
117

118
        // state is the internal mission control state that is input for
119
        // probability estimation.
120
        state *missionControlState
121

122
        store *missionControlStore
123

124
        // estimator is the probability estimator that is used with the payment
125
        // results that mission control collects.
126
        estimator Estimator
127

128
        // onConfigUpdate is a function that is called whenever the
129
        // mission control state is updated.
130
        onConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
131

132
        log btclog.Logger
133

134
        mu sync.Mutex
135
}
136

137
// MissionController manages MissionControl instances in various namespaces.
138
type MissionController struct {
139
        db           kvdb.Backend
140
        cfg          *mcConfig
141
        defaultMCCfg *MissionControlConfig
142

143
        mc map[string]*MissionControl
144
        mu sync.Mutex
145

146
        // TODO(roasbeef): further counters, if vertex continually unavailable,
147
        // add to another generation
148

149
        // TODO(roasbeef): also add favorable metrics for nodes
150
}
151

152
// GetNamespacedStore returns the MissionControl in the given namespace. If one
153
// does not yet exist, then it is initialised.
154
func (m *MissionController) GetNamespacedStore(ns string) (*MissionControl,
155
        error) {
34✔
156

34✔
157
        m.mu.Lock()
34✔
158
        defer m.mu.Unlock()
34✔
159

34✔
160
        if mc, ok := m.mc[ns]; ok {
67✔
161
                return mc, nil
33✔
162
        }
33✔
163

164
        return m.initMissionControl(ns)
1✔
165
}
166

167
// ListNamespaces returns a list of the namespaces that the MissionController
168
// is aware of.
169
func (m *MissionController) ListNamespaces() []string {
3✔
170
        m.mu.Lock()
3✔
171
        defer m.mu.Unlock()
3✔
172

3✔
173
        namespaces := make([]string, 0, len(m.mc))
3✔
174
        for ns := range m.mc {
8✔
175
                namespaces = append(namespaces, ns)
5✔
176
        }
5✔
177

178
        return namespaces
3✔
179
}
180

181
// MissionControlConfig defines parameters that control mission control
182
// behaviour.
183
type MissionControlConfig struct {
184
        // Estimator gives probability estimates for node pairs.
185
        Estimator Estimator
186

187
        // OnConfigUpdate is function that is called whenever the
188
        // mission control state is updated.
189
        OnConfigUpdate fn.Option[func(cfg *MissionControlConfig)]
190

191
        // MaxMcHistory defines the maximum number of payment results that are
192
        // held on disk.
193
        MaxMcHistory int
194

195
        // McFlushInterval defines the ticker interval when we flush the
196
        // accumulated state to the DB.
197
        McFlushInterval time.Duration
198

199
        // MinFailureRelaxInterval is the minimum time that must have passed
200
        // since the previously recorded failure before the failure amount may
201
        // be raised.
202
        MinFailureRelaxInterval time.Duration
203
}
204

205
func (c *MissionControlConfig) validate() error {
31✔
206
        if c.MaxMcHistory < 0 {
31✔
207
                return ErrInvalidMcHistory
×
208
        }
×
209

210
        if c.MinFailureRelaxInterval < 0 {
31✔
211
                return ErrInvalidFailureInterval
×
212
        }
×
213

214
        return nil
31✔
215
}
216

217
// String returns a string representation of a mission control config.
UNCOV
218
func (c *MissionControlConfig) String() string {
×
UNCOV
219
        return fmt.Sprintf("maximum history: %v, minimum failure relax "+
×
UNCOV
220
                "interval: %v", c.MaxMcHistory, c.MinFailureRelaxInterval)
×
UNCOV
221
}
×
222

223
// TimedPairResult describes a timestamped pair result.
224
type TimedPairResult struct {
225
        // FailTime is the time of the last failure.
226
        FailTime time.Time
227

228
        // FailAmt is the amount of the last failure. This amount may be pushed
229
        // up if a later success is higher than the last failed amount.
230
        FailAmt lnwire.MilliSatoshi
231

232
        // SuccessTime is the time of the last success.
233
        SuccessTime time.Time
234

235
        // SuccessAmt is the highest amount that successfully forwarded. This
236
        // isn't necessarily the last success amount. The value of this field
237
        // may also be pushed down if a later failure is lower than the highest
238
        // success amount. Because of this, SuccessAmt may not match
239
        // SuccessTime.
240
        SuccessAmt lnwire.MilliSatoshi
241
}
242

243
// MissionControlSnapshot contains a snapshot of the current state of mission
244
// control.
245
type MissionControlSnapshot struct {
246
        // Pairs is a list of channels for which specific information is
247
        // logged.
248
        Pairs []MissionControlPairSnapshot
249
}
250

251
// MissionControlPairSnapshot contains a snapshot of the current node pair
252
// state in mission control.
253
type MissionControlPairSnapshot struct {
254
        // Pair is the node pair of which the state is described.
255
        Pair DirectedNodePair
256

257
        // TimedPairResult contains the data for this pair.
258
        TimedPairResult
259
}
260

261
// paymentResult is the information that becomes available when a payment
262
// attempt completes.
263
type paymentResult struct {
264
        id                 uint64
265
        timeFwd, timeReply time.Time
266
        route              *mcRoute
267
        success            bool
268
        failureSourceIdx   *int
269
        failure            lnwire.FailureMessage
270
}
271

272
// NewMissionController returns a new instance of MissionController.
273
func NewMissionController(db kvdb.Backend, self route.Vertex,
274
        cfg *MissionControlConfig) (*MissionController, error) {
31✔
275

31✔
276
        log.Debugf("Instantiating mission control with config: %v, %v", cfg,
31✔
277
                cfg.Estimator)
31✔
278

31✔
279
        if err := cfg.validate(); err != nil {
31✔
280
                return nil, err
×
281
        }
×
282

283
        mcCfg := &mcConfig{
31✔
284
                clock:    clock.NewDefaultClock(),
31✔
285
                selfNode: self,
31✔
286
        }
31✔
287

31✔
288
        mgr := &MissionController{
31✔
289
                db:           db,
31✔
290
                defaultMCCfg: cfg,
31✔
291
                cfg:          mcCfg,
31✔
292
                mc:           make(map[string]*MissionControl),
31✔
293
        }
31✔
294

31✔
295
        if err := mgr.loadMissionControls(); err != nil {
31✔
296
                return nil, err
×
297
        }
×
298

299
        for _, mc := range mgr.mc {
63✔
300
                if err := mc.init(); err != nil {
32✔
301
                        return nil, err
×
302
                }
×
303
        }
304

305
        return mgr, nil
31✔
306
}
307

308
// loadMissionControls initialises a MissionControl in the default namespace if
309
// one does not yet exist. It then initialises a MissionControl for all other
310
// namespaces found in the DB.
311
//
312
// NOTE: this should only be called once during MissionController construction.
313
func (m *MissionController) loadMissionControls() error {
31✔
314
        m.mu.Lock()
31✔
315
        defer m.mu.Unlock()
31✔
316

31✔
317
        // Always initialise the default namespace.
31✔
318
        _, err := m.initMissionControl(DefaultMissionControlNamespace)
31✔
319
        if err != nil {
31✔
320
                return err
×
321
        }
×
322

323
        namespaces := make(map[string]struct{})
31✔
324
        err = m.db.View(func(tx walletdb.ReadTx) error {
62✔
325
                mcStoreBkt := tx.ReadBucket(resultsKey)
31✔
326
                if mcStoreBkt == nil {
31✔
327
                        return fmt.Errorf("top level mission control bucket " +
×
328
                                "not found")
×
329
                }
×
330

331
                // Iterate through all the keys in the bucket and collect the
332
                // namespaces.
333
                return mcStoreBkt.ForEach(func(k, _ []byte) error {
63✔
334
                        // We've already initialised the default namespace so
32✔
335
                        // we can skip it.
32✔
336
                        if string(k) == DefaultMissionControlNamespace {
63✔
337
                                return nil
31✔
338
                        }
31✔
339

340
                        namespaces[string(k)] = struct{}{}
1✔
341

1✔
342
                        return nil
1✔
343
                })
344
        }, func() {})
31✔
345
        if err != nil {
31✔
346
                return err
×
347
        }
×
348

349
        // Now, iterate through all the namespaces and initialise them.
350
        for ns := range namespaces {
32✔
351
                _, err = m.initMissionControl(ns)
1✔
352
                if err != nil {
1✔
353
                        return err
×
354
                }
×
355
        }
356

357
        return nil
31✔
358
}
359

360
// initMissionControl creates a new MissionControl instance with the given
361
// namespace if one does not yet exist.
362
//
363
// NOTE: the MissionController's mutex must be held before calling this method.
364
func (m *MissionController) initMissionControl(namespace string) (
365
        *MissionControl, error) {
33✔
366

33✔
367
        // If a mission control with this namespace has already been initialised
33✔
368
        // then there is nothing left to do.
33✔
369
        if mc, ok := m.mc[namespace]; ok {
33✔
370
                return mc, nil
×
371
        }
×
372

373
        cfg := m.defaultMCCfg
33✔
374

33✔
375
        store, err := newMissionControlStore(
33✔
376
                newNamespacedDB(m.db, namespace), cfg.MaxMcHistory,
33✔
377
                cfg.McFlushInterval,
33✔
378
        )
33✔
379
        if err != nil {
33✔
380
                return nil, err
×
381
        }
×
382

383
        mc := &MissionControl{
33✔
384
                cfg:       m.cfg,
33✔
385
                state:     newMissionControlState(cfg.MinFailureRelaxInterval),
33✔
386
                store:     store,
33✔
387
                estimator: cfg.Estimator,
33✔
388
                log: build.NewPrefixLog(
33✔
389
                        fmt.Sprintf("[%s]:", namespace), log,
33✔
390
                ),
33✔
391
                onConfigUpdate: cfg.OnConfigUpdate,
33✔
392
        }
33✔
393

33✔
394
        m.mc[namespace] = mc
33✔
395

33✔
396
        return mc, nil
33✔
397
}
398

399
// RunStoreTickers runs the mission controller store's tickers.
UNCOV
400
func (m *MissionController) RunStoreTickers() {
×
UNCOV
401
        m.mu.Lock()
×
UNCOV
402
        defer m.mu.Unlock()
×
UNCOV
403

×
UNCOV
404
        for _, mc := range m.mc {
×
UNCOV
405
                mc.store.run()
×
UNCOV
406
        }
×
407
}
408

409
// StopStoreTickers stops the mission control store's tickers.
UNCOV
410
func (m *MissionController) StopStoreTickers() {
×
UNCOV
411
        log.Debug("Stopping mission control store ticker")
×
UNCOV
412
        defer log.Debug("Mission control store ticker stopped")
×
UNCOV
413

×
UNCOV
414
        m.mu.Lock()
×
UNCOV
415
        defer m.mu.Unlock()
×
UNCOV
416

×
UNCOV
417
        for _, mc := range m.mc {
×
UNCOV
418
                mc.store.stop()
×
UNCOV
419
        }
×
420
}
421

422
// init initializes mission control with historical data.
423
func (m *MissionControl) init() error {
32✔
424
        m.log.Debugf("Mission control state reconstruction started")
32✔
425

32✔
426
        m.mu.Lock()
32✔
427
        defer m.mu.Unlock()
32✔
428

32✔
429
        start := time.Now()
32✔
430

32✔
431
        results, err := m.store.fetchAll()
32✔
432
        if err != nil {
32✔
433
                return err
×
434
        }
×
435

436
        for _, result := range results {
36✔
437
                _ = m.applyPaymentResult(result)
4✔
438
        }
4✔
439

440
        m.log.Debugf("Mission control state reconstruction finished: "+
32✔
441
                "n=%v, time=%v", len(results), time.Since(start))
32✔
442

32✔
443
        return nil
32✔
444
}
445

446
// GetConfig returns the config that mission control is currently configured
447
// with. All fields are copied by value, so we do not need to worry about
448
// mutation.
UNCOV
449
func (m *MissionControl) GetConfig() *MissionControlConfig {
×
UNCOV
450
        m.mu.Lock()
×
UNCOV
451
        defer m.mu.Unlock()
×
UNCOV
452

×
UNCOV
453
        return &MissionControlConfig{
×
UNCOV
454
                Estimator:               m.estimator,
×
UNCOV
455
                MaxMcHistory:            m.store.maxRecords,
×
UNCOV
456
                McFlushInterval:         m.store.flushInterval,
×
UNCOV
457
                MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
×
UNCOV
458
        }
×
UNCOV
459
}
×
460

461
// SetConfig validates the config provided and updates mission control's config
462
// if it is valid.
UNCOV
463
func (m *MissionControl) SetConfig(cfg *MissionControlConfig) error {
×
UNCOV
464
        if cfg == nil {
×
465
                return errors.New("nil mission control config")
×
466
        }
×
467

UNCOV
468
        if err := cfg.validate(); err != nil {
×
469
                return err
×
470
        }
×
471

UNCOV
472
        m.mu.Lock()
×
UNCOV
473
        defer m.mu.Unlock()
×
UNCOV
474

×
UNCOV
475
        m.log.Infof("Active mission control cfg: %v, estimator: %v", cfg,
×
UNCOV
476
                cfg.Estimator)
×
UNCOV
477

×
UNCOV
478
        m.store.maxRecords = cfg.MaxMcHistory
×
UNCOV
479
        m.state.minFailureRelaxInterval = cfg.MinFailureRelaxInterval
×
UNCOV
480
        m.estimator = cfg.Estimator
×
UNCOV
481

×
UNCOV
482
        // Execute the callback function if it is set.
×
UNCOV
483
        m.onConfigUpdate.WhenSome(func(f func(cfg *MissionControlConfig)) {
×
UNCOV
484
                f(cfg)
×
UNCOV
485
        })
×
486

UNCOV
487
        return nil
×
488
}
489

490
// ResetHistory resets the history of MissionControl returning it to a state as
491
// if no payment attempts have been made.
492
func (m *MissionControl) ResetHistory() error {
3✔
493
        m.mu.Lock()
3✔
494
        defer m.mu.Unlock()
3✔
495

3✔
496
        if err := m.store.clear(); err != nil {
3✔
497
                return err
×
498
        }
×
499

500
        m.state.resetHistory()
3✔
501

3✔
502
        m.log.Debugf("Mission control history cleared")
3✔
503

3✔
504
        return nil
3✔
505
}
506

507
// GetProbability is expected to return the success probability of a payment
508
// from fromNode along edge.
509
func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
510
        amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 {
662✔
511

662✔
512
        m.mu.Lock()
662✔
513
        defer m.mu.Unlock()
662✔
514

662✔
515
        now := m.cfg.clock.Now()
662✔
516
        results, _ := m.state.getLastPairResult(fromNode)
662✔
517

662✔
518
        // Use a distinct probability estimation function for local channels.
662✔
519
        if fromNode == m.cfg.selfNode {
701✔
520
                return m.estimator.LocalPairProbability(now, results, toNode)
39✔
521
        }
39✔
522

523
        return m.estimator.PairProbability(
623✔
524
                now, results, toNode, amt, capacity,
623✔
525
        )
623✔
526
}
527

528
// GetHistorySnapshot takes a snapshot from the current mission control state
529
// and actual probability estimates.
530
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
1✔
531
        m.mu.Lock()
1✔
532
        defer m.mu.Unlock()
1✔
533

1✔
534
        m.log.Debugf("Requesting history snapshot from mission control")
1✔
535

1✔
536
        return m.state.getSnapshot()
1✔
537
}
1✔
538

539
// ImportHistory imports the set of mission control results provided to our
540
// in-memory state. These results are not persisted, so will not survive
541
// restarts.
542
func (m *MissionControl) ImportHistory(history *MissionControlSnapshot,
UNCOV
543
        force bool) error {
×
UNCOV
544

×
UNCOV
545
        if history == nil {
×
546
                return errors.New("cannot import nil history")
×
547
        }
×
548

UNCOV
549
        m.mu.Lock()
×
UNCOV
550
        defer m.mu.Unlock()
×
UNCOV
551

×
UNCOV
552
        m.log.Infof("Importing history snapshot with %v pairs to mission "+
×
UNCOV
553
                "control", len(history.Pairs))
×
UNCOV
554

×
UNCOV
555
        imported := m.state.importSnapshot(history, force)
×
UNCOV
556

×
UNCOV
557
        m.log.Infof("Imported %v results to mission control", imported)
×
UNCOV
558

×
UNCOV
559
        return nil
×
560
}
561

562
// GetPairHistorySnapshot returns the stored history for a given node pair.
563
func (m *MissionControl) GetPairHistorySnapshot(
UNCOV
564
        fromNode, toNode route.Vertex) TimedPairResult {
×
UNCOV
565

×
UNCOV
566
        m.mu.Lock()
×
UNCOV
567
        defer m.mu.Unlock()
×
UNCOV
568

×
UNCOV
569
        results, ok := m.state.getLastPairResult(fromNode)
×
UNCOV
570
        if !ok {
×
UNCOV
571
                return TimedPairResult{}
×
UNCOV
572
        }
×
573

UNCOV
574
        result, ok := results[toNode]
×
UNCOV
575
        if !ok {
×
UNCOV
576
                return TimedPairResult{}
×
UNCOV
577
        }
×
578

UNCOV
579
        return result
×
580
}
581

582
// ReportPaymentFail reports a failed payment to mission control as input for
583
// future probability estimates. The failureSourceIdx argument indicates the
584
// failure source. If it is nil, the failure source is unknown. This function
585
// returns a reason if this failure is a final failure. In that case no further
586
// payment attempts need to be made.
587
func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route,
588
        failureSourceIdx *int, failure lnwire.FailureMessage) (
589
        *channeldb.FailureReason, error) {
46✔
590

46✔
591
        timestamp := m.cfg.clock.Now()
46✔
592

46✔
593
        result := &paymentResult{
46✔
594
                success:          false,
46✔
595
                timeFwd:          timestamp,
46✔
596
                timeReply:        timestamp,
46✔
597
                id:               paymentID,
46✔
598
                failureSourceIdx: failureSourceIdx,
46✔
599
                failure:          failure,
46✔
600
                route:            extractMCRoute(rt),
46✔
601
        }
46✔
602

46✔
603
        return m.processPaymentResult(result)
46✔
604
}
46✔
605

606
// ReportPaymentSuccess reports a successful payment to mission control as input
607
// for future probability estimates.
608
func (m *MissionControl) ReportPaymentSuccess(paymentID uint64,
609
        rt *route.Route) error {
22✔
610

22✔
611
        timestamp := m.cfg.clock.Now()
22✔
612

22✔
613
        result := &paymentResult{
22✔
614
                timeFwd:   timestamp,
22✔
615
                timeReply: timestamp,
22✔
616
                id:        paymentID,
22✔
617
                success:   true,
22✔
618
                route:     extractMCRoute(rt),
22✔
619
        }
22✔
620

22✔
621
        _, err := m.processPaymentResult(result)
22✔
622
        return err
22✔
623
}
22✔
624

625
// processPaymentResult stores a payment result in the mission control store and
626
// updates mission control's in-memory state.
627
func (m *MissionControl) processPaymentResult(result *paymentResult) (
628
        *channeldb.FailureReason, error) {
68✔
629

68✔
630
        // Store complete result in database.
68✔
631
        m.store.AddResult(result)
68✔
632

68✔
633
        m.mu.Lock()
68✔
634
        defer m.mu.Unlock()
68✔
635

68✔
636
        // Apply result to update mission control state.
68✔
637
        reason := m.applyPaymentResult(result)
68✔
638

68✔
639
        return reason, nil
68✔
640
}
68✔
641

642
// applyPaymentResult applies a payment result as input for future probability
643
// estimates. It returns a bool indicating whether this error is a final error
644
// and no further payment attempts need to be made.
645
func (m *MissionControl) applyPaymentResult(
646
        result *paymentResult) *channeldb.FailureReason {
72✔
647

72✔
648
        // Interpret result.
72✔
649
        i := interpretResult(
72✔
650
                result.route, result.success, result.failureSourceIdx,
72✔
651
                result.failure,
72✔
652
        )
72✔
653

72✔
654
        if i.policyFailure != nil {
81✔
655
                if m.state.requestSecondChance(
9✔
656
                        result.timeReply,
9✔
657
                        i.policyFailure.From, i.policyFailure.To,
9✔
658
                ) {
15✔
659
                        return nil
6✔
660
                }
6✔
661
        }
662

663
        // If there is a node-level failure, record a failure for every tried
664
        // connection of that node. A node-level failure can be considered as a
665
        // failure that would have occurred with any of the node's channels.
666
        //
667
        // Ideally we'd also record the failure for the untried connections of
668
        // the node. Unfortunately this would require access to the graph and
669
        // adding this dependency and db calls does not outweigh the benefits.
670
        //
671
        // Untried connections will fall back to the node probability. After the
672
        // call to setAllPairResult below, the node probability will be equal to
673
        // the probability of the tried channels except that the a priori
674
        // probability is mixed in too. This effect is controlled by the
675
        // aprioriWeight parameter. If that parameter isn't set to an extreme
676
        // and there are a few known connections, there shouldn't be much of a
677
        // difference. The largest difference occurs when aprioriWeight is 1. In
678
        // that case, a node-level failure would not be applied to untried
679
        // channels.
680
        if i.nodeFailure != nil {
72✔
681
                m.log.Debugf("Reporting node failure to Mission Control: "+
6✔
682
                        "node=%v", *i.nodeFailure)
6✔
683

6✔
684
                m.state.setAllFail(*i.nodeFailure, result.timeReply)
6✔
685
        }
6✔
686

687
        for pair, pairResult := range i.pairResults {
212✔
688
                pairResult := pairResult
146✔
689

146✔
690
                if pairResult.success {
227✔
691
                        m.log.Debugf("Reporting pair success to Mission "+
81✔
692
                                "Control: pair=%v, amt=%v",
81✔
693
                                pair, pairResult.amt)
81✔
694
                } else {
146✔
695
                        m.log.Debugf("Reporting pair failure to Mission "+
65✔
696
                                "Control: pair=%v, amt=%v",
65✔
697
                                pair, pairResult.amt)
65✔
698
                }
65✔
699

700
                m.state.setLastPairResult(
146✔
701
                        pair.From, pair.To, result.timeReply, &pairResult, false,
146✔
702
                )
146✔
703
        }
704

705
        return i.finalFailureReason
66✔
706
}
707

708
// namespacedDB is an implementation of the missionControlDB that gives a user
709
// of the interface access to a namespaced bucket within the top level mission
710
// control bucket.
711
type namespacedDB struct {
712
        topLevelBucketKey []byte
713
        namespace         []byte
714
        db                kvdb.Backend
715
}
716

717
// A compile-time check to ensure that namespacedDB implements missionControlDB.
718
var _ missionControlDB = (*namespacedDB)(nil)
719

720
// newDefaultNamespacedStore creates an instance of namespaceDB that uses the
721
// default namespace.
722
func newDefaultNamespacedStore(db kvdb.Backend) missionControlDB {
4✔
723
        return newNamespacedDB(db, DefaultMissionControlNamespace)
4✔
724
}
4✔
725

726
// newNamespacedDB creates a new instance of missionControlDB where the DB will
727
// have access to a namespaced bucket within the top level mission control
728
// bucket.
729
func newNamespacedDB(db kvdb.Backend, namespace string) missionControlDB {
37✔
730
        return &namespacedDB{
37✔
731
                db:                db,
37✔
732
                namespace:         []byte(namespace),
37✔
733
                topLevelBucketKey: resultsKey,
37✔
734
        }
37✔
735
}
37✔
736

737
// update can be used to perform reads and writes on the given bucket.
738
//
739
// NOTE: this is part of the missionControlDB interface.
740
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
741
        reset func()) error {
45✔
742

45✔
743
        return n.db.Update(func(tx kvdb.RwTx) error {
90✔
744
                mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
45✔
745
                if err != nil {
45✔
746
                        return fmt.Errorf("cannot create top level mission "+
×
747
                                "control bucket: %w", err)
×
748
                }
×
749

750
                namespacedBkt, err := mcStoreBkt.CreateBucketIfNotExists(
45✔
751
                        n.namespace,
45✔
752
                )
45✔
753
                if err != nil {
45✔
754
                        return fmt.Errorf("cannot create namespaced bucket "+
×
755
                                "(%s) in mission control store: %w",
×
756
                                n.namespace, err)
×
757
                }
×
758

759
                return f(namespacedBkt)
45✔
760
        }, reset)
761
}
762

763
// view can be used to perform reads on the given bucket.
764
//
765
// NOTE: this is part of the missionControlDB interface.
766
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
767
        reset func()) error {
46✔
768

46✔
769
        return n.db.View(func(tx kvdb.RTx) error {
92✔
770
                mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
46✔
771
                if mcStoreBkt == nil {
46✔
772
                        return fmt.Errorf("top level mission control bucket " +
×
773
                                "not found")
×
774
                }
×
775

776
                namespacedBkt := mcStoreBkt.NestedReadBucket(n.namespace)
46✔
777
                if namespacedBkt == nil {
46✔
778
                        return fmt.Errorf("namespaced bucket (%s) not found "+
×
779
                                "in mission control store", n.namespace)
×
780
                }
×
781

782
                return f(namespacedBkt)
46✔
783
        }, reset)
784
}
785

786
// purge will delete all the contents in the namespace.
787
//
788
// NOTE: this is part of the missionControlDB interface.
789
func (n *namespacedDB) purge() error {
3✔
790
        return n.db.Update(func(tx kvdb.RwTx) error {
6✔
791
                mcStoreBkt := tx.ReadWriteBucket(n.topLevelBucketKey)
3✔
792
                if mcStoreBkt == nil {
3✔
793
                        return nil
×
794
                }
×
795

796
                err := mcStoreBkt.DeleteNestedBucket(n.namespace)
3✔
797
                if err != nil {
3✔
798
                        return err
×
799
                }
×
800

801
                _, err = mcStoreBkt.CreateBucket(n.namespace)
3✔
802

3✔
803
                return err
3✔
804
        }, func() {})
3✔
805
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc