• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.88
/routing/payment_lifecycle.go
1
package routing
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "time"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/davecgh/go-spew/spew"
11
        sphinx "github.com/lightningnetwork/lightning-onion"
12
        "github.com/lightningnetwork/lnd/channeldb"
13
        "github.com/lightningnetwork/lnd/fn/v2"
14
        "github.com/lightningnetwork/lnd/graph/db/models"
15
        "github.com/lightningnetwork/lnd/htlcswitch"
16
        "github.com/lightningnetwork/lnd/lntypes"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/routing/route"
19
        "github.com/lightningnetwork/lnd/routing/shards"
20
        "github.com/lightningnetwork/lnd/tlv"
21
)
22

23
// ErrPaymentLifecycleExiting is used when waiting for htlc attempt result, but
24
// the payment lifecycle is exiting .
25
var ErrPaymentLifecycleExiting = errors.New("payment lifecycle exiting")
26

27
// paymentLifecycle holds all information about the current state of a payment
28
// needed to resume if from any point.
29
type paymentLifecycle struct {
30
        router                *ChannelRouter
31
        feeLimit              lnwire.MilliSatoshi
32
        identifier            lntypes.Hash
33
        paySession            PaymentSession
34
        shardTracker          shards.ShardTracker
35
        currentHeight         int32
36
        firstHopCustomRecords lnwire.CustomRecords
37

38
        // quit is closed to signal the sub goroutines of the payment lifecycle
39
        // to stop.
40
        quit chan struct{}
41

42
        // resultCollected is used to signal that the result of an attempt has
43
        // been collected. A nil error means the attempt is either successful
44
        // or failed with temporary error. Otherwise, we should exit the
45
        // lifecycle loop as a terminal error has occurred.
46
        resultCollected chan error
47

48
        // resultCollector is a function that is used to collect the result of
49
        // an HTLC attempt, which is always mounted to `p.collectResultAsync`
50
        // except in unit test, where we use a much simpler resultCollector to
51
        // decouple the test flow for the payment lifecycle.
52
        resultCollector func(attempt *channeldb.HTLCAttempt)
53
}
54

55
// newPaymentLifecycle initiates a new payment lifecycle and returns it.
56
func newPaymentLifecycle(r *ChannelRouter, feeLimit lnwire.MilliSatoshi,
57
        identifier lntypes.Hash, paySession PaymentSession,
58
        shardTracker shards.ShardTracker, currentHeight int32,
59
        firstHopCustomRecords lnwire.CustomRecords) *paymentLifecycle {
40✔
60

40✔
61
        p := &paymentLifecycle{
40✔
62
                router:                r,
40✔
63
                feeLimit:              feeLimit,
40✔
64
                identifier:            identifier,
40✔
65
                paySession:            paySession,
40✔
66
                shardTracker:          shardTracker,
40✔
67
                currentHeight:         currentHeight,
40✔
68
                quit:                  make(chan struct{}),
40✔
69
                resultCollected:       make(chan error, 1),
40✔
70
                firstHopCustomRecords: firstHopCustomRecords,
40✔
71
        }
40✔
72

40✔
73
        // Mount the result collector.
40✔
74
        p.resultCollector = p.collectResultAsync
40✔
75

40✔
76
        return p
40✔
77
}
40✔
78

79
// calcFeeBudget returns the available fee to be used for sending HTLC
80
// attempts.
81
func (p *paymentLifecycle) calcFeeBudget(
82
        feesPaid lnwire.MilliSatoshi) lnwire.MilliSatoshi {
103✔
83

103✔
84
        budget := p.feeLimit
103✔
85

103✔
86
        // We'll subtract the used fee from our fee budget. In case of
103✔
87
        // overflow, we need to check whether feesPaid exceeds our budget
103✔
88
        // already.
103✔
89
        if feesPaid <= budget {
206✔
90
                budget -= feesPaid
103✔
91
        } else {
103✔
92
                budget = 0
×
93
        }
×
94

95
        return budget
103✔
96
}
97

98
// stateStep defines an action to be taken in our payment lifecycle. We either
99
// quit, continue, or exit the lifecycle, see details below.
100
type stateStep uint8
101

102
const (
103
        // stepSkip is used when we need to skip the current lifecycle and jump
104
        // to the next one.
105
        stepSkip stateStep = iota
106

107
        // stepProceed is used when we can proceed the current lifecycle.
108
        stepProceed
109

110
        // stepExit is used when we need to quit the current lifecycle.
111
        stepExit
112
)
113

114
// decideNextStep is used to determine the next step in the payment lifecycle.
115
func (p *paymentLifecycle) decideNextStep(
116
        payment DBMPPayment) (stateStep, error) {
74✔
117

74✔
118
        // Check whether we could make new HTLC attempts.
74✔
119
        allow, err := payment.AllowMoreAttempts()
74✔
120
        if err != nil {
76✔
121
                return stepExit, err
2✔
122
        }
2✔
123

124
        if !allow {
113✔
125
                // Check whether we need to wait for results.
41✔
126
                wait, err := payment.NeedWaitAttempts()
41✔
127
                if err != nil {
42✔
128
                        return stepExit, err
1✔
129
                }
1✔
130

131
                // If we are not allowed to make new HTLC attempts and there's
132
                // no need to wait, the lifecycle is done and we can exit.
133
                if !wait {
57✔
134
                        return stepExit, nil
17✔
135
                }
17✔
136

137
                log.Tracef("Waiting for attempt results for payment %v",
23✔
138
                        p.identifier)
23✔
139

23✔
140
                // Otherwise we wait for one HTLC attempt then continue
23✔
141
                // the lifecycle.
23✔
142
                //
23✔
143
                // NOTE: we don't check `p.quit` since `decideNextStep` is
23✔
144
                // running in the same goroutine as `resumePayment`.
23✔
145
                select {
23✔
146
                case err := <-p.resultCollected:
22✔
147
                        // If an error is returned, exit with it.
22✔
148
                        if err != nil {
22✔
149
                                return stepExit, err
×
150
                        }
×
151

152
                        log.Tracef("Received attempt result for payment %v",
22✔
153
                                p.identifier)
22✔
154

155
                case <-p.router.quit:
1✔
156
                        return stepExit, ErrRouterShuttingDown
1✔
157
                }
158

159
                return stepSkip, nil
22✔
160
        }
161

162
        // Otherwise we need to make more attempts.
163
        return stepProceed, nil
31✔
164
}
165

166
// resumePayment resumes the paymentLifecycle from the current state.
167
func (p *paymentLifecycle) resumePayment(ctx context.Context) ([32]byte,
168
        *route.Route, error) {
22✔
169

22✔
170
        // When the payment lifecycle loop exits, we make sure to signal any
22✔
171
        // sub goroutine of the HTLC attempt to exit, then wait for them to
22✔
172
        // return.
22✔
173
        defer p.stop()
22✔
174

22✔
175
        // If we had any existing attempts outstanding, we'll start by spinning
22✔
176
        // up goroutines that'll collect their results and deliver them to the
22✔
177
        // lifecycle loop below.
22✔
178
        payment, err := p.router.cfg.Control.FetchPayment(p.identifier)
22✔
179
        if err != nil {
23✔
180
                return [32]byte{}, nil, err
1✔
181
        }
1✔
182

183
        for _, a := range payment.InFlightHTLCs() {
21✔
184
                a := a
×
185

×
186
                log.Infof("Resuming HTLC attempt %v for payment %v",
×
187
                        a.AttemptID, p.identifier)
×
188

×
189
                p.resultCollector(&a)
×
190
        }
×
191

192
        // exitWithErr is a helper closure that logs and returns an error.
193
        exitWithErr := func(err error) ([32]byte, *route.Route, error) {
26✔
194
                log.Errorf("Payment %v with status=%v failed: %v",
5✔
195
                        p.identifier, payment.GetStatus(), err)
5✔
196
                return [32]byte{}, nil, err
5✔
197
        }
5✔
198

199
        // We'll continue until either our payment succeeds, or we encounter a
200
        // critical error during path finding.
201
lifecycle:
21✔
202
        for {
90✔
203
                // We update the payment state on every iteration. Since the
69✔
204
                // payment state is affected by multiple goroutines (ie,
69✔
205
                // collectResultAsync), it is NOT guaranteed that we always
69✔
206
                // have the latest state here. This is fine as long as the
69✔
207
                // state is consistent as a whole.
69✔
208
                payment, err = p.router.cfg.Control.FetchPayment(p.identifier)
69✔
209
                if err != nil {
69✔
210
                        return exitWithErr(err)
×
211
                }
×
212

213
                ps := payment.GetState()
69✔
214
                remainingFees := p.calcFeeBudget(ps.FeesPaid)
69✔
215

69✔
216
                log.Debugf("Payment %v: status=%v, active_shards=%v, "+
69✔
217
                        "rem_value=%v, fee_limit=%v", p.identifier,
69✔
218
                        payment.GetStatus(), ps.NumAttemptsInFlight,
69✔
219
                        ps.RemainingAmt, remainingFees)
69✔
220

69✔
221
                // We now proceed our lifecycle with the following tasks in
69✔
222
                // order,
69✔
223
                //   1. check context.
69✔
224
                //   2. request route.
69✔
225
                //   3. create HTLC attempt.
69✔
226
                //   4. send HTLC attempt.
69✔
227
                //   5. collect HTLC attempt result.
69✔
228
                //
69✔
229
                // Before we attempt any new shard, we'll check to see if we've
69✔
230
                // gone past the payment attempt timeout, or if the context was
69✔
231
                // cancelled, or the router is exiting. In any of these cases,
69✔
232
                // we'll stop this payment attempt short.
69✔
233
                if err := p.checkContext(ctx); err != nil {
70✔
234
                        return exitWithErr(err)
1✔
235
                }
1✔
236

237
                // Now decide the next step of the current lifecycle.
238
                step, err := p.decideNextStep(payment)
68✔
239
                if err != nil {
69✔
240
                        return exitWithErr(err)
1✔
241
                }
1✔
242

243
                switch step {
67✔
244
                // Exit the for loop and return below.
245
                case stepExit:
16✔
246
                        break lifecycle
16✔
247

248
                // Continue the for loop and skip the rest.
249
                case stepSkip:
21✔
250
                        continue lifecycle
21✔
251

252
                // Continue the for loop and proceed the rest.
253
                case stepProceed:
30✔
254

255
                // Unknown step received, exit with an error.
256
                default:
×
257
                        err = fmt.Errorf("unknown step: %v", step)
×
258
                        return exitWithErr(err)
×
259
                }
260

261
                // Now request a route to be used to create our HTLC attempt.
262
                rt, err := p.requestRoute(ps)
30✔
263
                if err != nil {
31✔
264
                        return exitWithErr(err)
1✔
265
                }
1✔
266

267
                // We may not be able to find a route for current attempt. In
268
                // that case, we continue the loop and move straight to the
269
                // next iteration in case there are results for inflight HTLCs
270
                // that still need to be collected.
271
                if rt == nil {
31✔
272
                        log.Errorf("No route found for payment %v",
2✔
273
                                p.identifier)
2✔
274

2✔
275
                        continue lifecycle
2✔
276
                }
277

278
                log.Tracef("Found route: %s", spew.Sdump(rt.Hops))
27✔
279

27✔
280
                // Allow the traffic shaper to add custom records to the
27✔
281
                // outgoing HTLC and also adjust the amount if needed.
27✔
282
                err = p.amendFirstHopData(rt)
27✔
283
                if err != nil {
27✔
284
                        return exitWithErr(err)
×
285
                }
×
286

287
                // We found a route to try, create a new HTLC attempt to try.
288
                attempt, err := p.registerAttempt(rt, ps.RemainingAmt)
27✔
289
                if err != nil {
28✔
290
                        return exitWithErr(err)
1✔
291
                }
1✔
292

293
                // Once the attempt is created, send it to the htlcswitch.
294
                result, err := p.sendAttempt(attempt)
26✔
295
                if err != nil {
27✔
296
                        return exitWithErr(err)
1✔
297
                }
1✔
298

299
                // Now that the shard was successfully sent, launch a go
300
                // routine that will handle its result when its back.
301
                if result.err == nil {
49✔
302
                        p.resultCollector(attempt)
24✔
303
                }
24✔
304
        }
305

306
        // Once we are out the lifecycle loop, it means we've reached a
307
        // terminal condition. We either return the settled preimage or the
308
        // payment's failure reason.
309
        //
310
        // Optionally delete the failed attempts from the database.
311
        err = p.router.cfg.Control.DeleteFailedAttempts(p.identifier)
16✔
312
        if err != nil {
16✔
313
                log.Errorf("Error deleting failed htlc attempts for payment "+
×
314
                        "%v: %v", p.identifier, err)
×
315
        }
×
316

317
        htlc, failure := payment.TerminalInfo()
16✔
318
        if htlc != nil {
28✔
319
                return htlc.Settle.Preimage, &htlc.Route, nil
12✔
320
        }
12✔
321

322
        // Otherwise return the payment failure reason.
323
        return [32]byte{}, nil, *failure
4✔
324
}
325

326
// checkContext checks whether the payment context has been canceled.
327
// Cancellation occurs manually or if the context times out.
328
func (p *paymentLifecycle) checkContext(ctx context.Context) error {
72✔
329
        select {
72✔
330
        case <-ctx.Done():
4✔
331
                // If the context was canceled, we'll mark the payment as
4✔
332
                // failed. There are two cases to distinguish here: Either a
4✔
333
                // user-provided timeout was reached, or the context was
4✔
334
                // canceled, either to a manual cancellation or due to an
4✔
335
                // unknown error.
4✔
336
                var reason channeldb.FailureReason
4✔
337
                if errors.Is(ctx.Err(), context.DeadlineExceeded) {
7✔
338
                        reason = channeldb.FailureReasonTimeout
3✔
339
                        log.Warnf("Payment attempt not completed before "+
3✔
340
                                "timeout, id=%s", p.identifier.String())
3✔
341
                } else {
4✔
342
                        reason = channeldb.FailureReasonCanceled
1✔
343
                        log.Warnf("Payment attempt context canceled, id=%s",
1✔
344
                                p.identifier.String())
1✔
345
                }
1✔
346

347
                // By marking the payment failed, depending on whether it has
348
                // inflight HTLCs or not, its status will now either be
349
                // `StatusInflight` or `StatusFailed`. In either case, no more
350
                // HTLCs will be attempted.
351
                err := p.router.cfg.Control.FailPayment(p.identifier, reason)
4✔
352
                if err != nil {
5✔
353
                        return fmt.Errorf("FailPayment got %w", err)
1✔
354
                }
1✔
355

356
        case <-p.router.quit:
2✔
357
                return fmt.Errorf("check payment timeout got: %w",
2✔
358
                        ErrRouterShuttingDown)
2✔
359

360
        // Fall through if we haven't hit our time limit.
361
        default:
66✔
362
        }
363

364
        return nil
69✔
365
}
366

367
// requestRoute is responsible for finding a route to be used to create an HTLC
368
// attempt.
369
func (p *paymentLifecycle) requestRoute(
370
        ps *channeldb.MPPaymentState) (*route.Route, error) {
34✔
371

34✔
372
        remainingFees := p.calcFeeBudget(ps.FeesPaid)
34✔
373

34✔
374
        // Query our payment session to construct a route.
34✔
375
        rt, err := p.paySession.RequestRoute(
34✔
376
                ps.RemainingAmt, remainingFees,
34✔
377
                uint32(ps.NumAttemptsInFlight), uint32(p.currentHeight),
34✔
378
                p.firstHopCustomRecords,
34✔
379
        )
34✔
380

34✔
381
        // Exit early if there's no error.
34✔
382
        if err == nil {
62✔
383
                return rt, nil
28✔
384
        }
28✔
385

386
        // Otherwise we need to handle the error.
387
        log.Warnf("Failed to find route for payment %v: %v", p.identifier, err)
6✔
388

6✔
389
        // If the error belongs to `noRouteError` set, it means a non-critical
6✔
390
        // error has happened during path finding, and we will mark the payment
6✔
391
        // failed with this reason. Otherwise, we'll return the critical error
6✔
392
        // found to abort the lifecycle.
6✔
393
        var routeErr noRouteError
6✔
394
        if !errors.As(err, &routeErr) {
8✔
395
                return nil, fmt.Errorf("requestRoute got: %w", err)
2✔
396
        }
2✔
397

398
        // It's the `paymentSession`'s responsibility to find a route for us
399
        // with the best effort. When it cannot find a path, we need to treat it
400
        // as a terminal condition and fail the payment no matter it has
401
        // inflight HTLCs or not.
402
        failureCode := routeErr.FailureReason()
4✔
403
        log.Warnf("Marking payment %v permanently failed with no route: %v",
4✔
404
                p.identifier, failureCode)
4✔
405

4✔
406
        err = p.router.cfg.Control.FailPayment(p.identifier, failureCode)
4✔
407
        if err != nil {
5✔
408
                return nil, fmt.Errorf("FailPayment got: %w", err)
1✔
409
        }
1✔
410

411
        // NOTE: we decide to not return the non-critical noRouteError here to
412
        // avoid terminating the payment lifecycle as there might be other
413
        // inflight HTLCs which we must wait for their results.
414
        return nil, nil
3✔
415
}
416

417
// stop signals any active shard goroutine to exit.
418
func (p *paymentLifecycle) stop() {
23✔
419
        close(p.quit)
23✔
420
}
23✔
421

422
// attemptResult holds the HTLC attempt and a possible error returned from
423
// sending it.
424
type attemptResult struct {
425
        // err is non-nil if a non-critical error was encountered when trying
426
        // to send the attempt, and we successfully updated the control tower
427
        // to reflect this error. This can be errors like not enough local
428
        // balance for the given route etc.
429
        err error
430

431
        // attempt is the attempt structure as recorded in the database.
432
        attempt *channeldb.HTLCAttempt
433
}
434

435
// collectResultAsync launches a goroutine that will wait for the result of the
436
// given HTLC attempt to be available then handle its result. Once received, it
437
// will send a nil error to channel `resultCollected` to indicate there's a
438
// result.
439
func (p *paymentLifecycle) collectResultAsync(attempt *channeldb.HTLCAttempt) {
22✔
440
        log.Debugf("Collecting result for attempt %v in payment %v",
22✔
441
                attempt.AttemptID, p.identifier)
22✔
442

22✔
443
        go func() {
44✔
444
                // Block until the result is available.
22✔
445
                _, err := p.collectResult(attempt)
22✔
446
                if err != nil {
22✔
447
                        log.Errorf("Error collecting result for attempt %v "+
×
448
                                "in payment %v: %v", attempt.AttemptID,
×
449
                                p.identifier, err)
×
450
                }
×
451

452
                log.Debugf("Result collected for attempt %v in payment %v",
22✔
453
                        attempt.AttemptID, p.identifier)
22✔
454

22✔
455
                // Once the result is collected, we signal it by writing the
22✔
456
                // error to `resultCollected`.
22✔
457
                select {
22✔
458
                // Send the signal or quit.
459
                case p.resultCollected <- err:
22✔
460

461
                case <-p.quit:
×
462
                        log.Debugf("Lifecycle exiting while collecting "+
×
463
                                "result for payment %v", p.identifier)
×
464

465
                case <-p.router.quit:
×
466
                        return
×
467
                }
468
        }()
469
}
470

471
// collectResult waits for the result for the given attempt to be available
472
// from the Switch, then records the attempt outcome with the control tower.
473
// An attemptResult is returned, indicating the final outcome of this HTLC
474
// attempt.
475
func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) (
476
        *attemptResult, error) {
34✔
477

34✔
478
        log.Tracef("Collecting result for attempt %v", spew.Sdump(attempt))
34✔
479

34✔
480
        // We'll retrieve the hash specific to this shard from the
34✔
481
        // shardTracker, since it will be needed to regenerate the circuit
34✔
482
        // below.
34✔
483
        hash, err := p.shardTracker.GetHash(attempt.AttemptID)
34✔
484
        if err != nil {
34✔
485
                return p.failAttempt(attempt.AttemptID, err)
×
486
        }
×
487

488
        // Regenerate the circuit for this attempt.
489
        _, circuit, err := generateSphinxPacket(
34✔
490
                &attempt.Route, hash[:], attempt.SessionKey(),
34✔
491
        )
34✔
492
        // TODO(yy): We generate this circuit to create the error decryptor,
34✔
493
        // which is then used in htlcswitch as the deobfuscator to decode the
34✔
494
        // error from `UpdateFailHTLC`. However, suppose it's an
34✔
495
        // `UpdateFulfillHTLC` message yet for some reason the sphinx packet is
34✔
496
        // failed to be generated, we'd miss settling it. This means we should
34✔
497
        // give it a second chance to try the settlement path in case
34✔
498
        // `GetAttemptResult` gives us back the preimage. And move the circuit
34✔
499
        // creation into htlcswitch so it's only constructed when there's a
34✔
500
        // failure message we need to decode.
34✔
501
        if err != nil {
34✔
502
                log.Debugf("Unable to generate circuit for attempt %v: %v",
×
503
                        attempt.AttemptID, err)
×
504

×
505
                return p.failAttempt(attempt.AttemptID, err)
×
506
        }
×
507

508
        // Using the created circuit, initialize the error decrypter, so we can
509
        // parse+decode any failures incurred by this payment within the
510
        // switch.
511
        errorDecryptor := &htlcswitch.SphinxErrorDecrypter{
34✔
512
                OnionErrorDecrypter: sphinx.NewOnionErrorDecrypter(circuit),
34✔
513
        }
34✔
514

34✔
515
        // Now ask the switch to return the result of the payment when
34✔
516
        // available.
34✔
517
        //
34✔
518
        // TODO(yy): consider using htlcswitch to create the `errorDecryptor`
34✔
519
        // since the htlc is already in db. This will also make the interface
34✔
520
        // `PaymentAttemptDispatcher` deeper and easier to use. Moreover, we'd
34✔
521
        // only create the decryptor when received a failure, further saving us
34✔
522
        // a few CPU cycles.
34✔
523
        resultChan, err := p.router.cfg.Payer.GetAttemptResult(
34✔
524
                attempt.AttemptID, p.identifier, errorDecryptor,
34✔
525
        )
34✔
526
        // Handle the switch error.
34✔
527
        if err != nil {
35✔
528
                log.Errorf("Failed getting result for attemptID %d "+
1✔
529
                        "from switch: %v", attempt.AttemptID, err)
1✔
530

1✔
531
                return p.handleSwitchErr(attempt, err)
1✔
532
        }
1✔
533

534
        // The switch knows about this payment, we'll wait for a result to be
535
        // available.
536
        var (
33✔
537
                result *htlcswitch.PaymentResult
33✔
538
                ok     bool
33✔
539
        )
33✔
540

33✔
541
        select {
33✔
542
        case result, ok = <-resultChan:
31✔
543
                if !ok {
32✔
544
                        return nil, htlcswitch.ErrSwitchExiting
1✔
545
                }
1✔
546

547
        case <-p.quit:
1✔
548
                return nil, ErrPaymentLifecycleExiting
1✔
549

550
        case <-p.router.quit:
1✔
551
                return nil, ErrRouterShuttingDown
1✔
552
        }
553

554
        // In case of a payment failure, fail the attempt with the control
555
        // tower and return.
556
        if result.Error != nil {
46✔
557
                return p.handleSwitchErr(attempt, result.Error)
16✔
558
        }
16✔
559

560
        // We successfully got a payment result back from the switch.
561
        log.Debugf("Payment %v succeeded with pid=%v",
14✔
562
                p.identifier, attempt.AttemptID)
14✔
563

14✔
564
        // Report success to mission control.
14✔
565
        err = p.router.cfg.MissionControl.ReportPaymentSuccess(
14✔
566
                attempt.AttemptID, &attempt.Route,
14✔
567
        )
14✔
568
        if err != nil {
14✔
569
                log.Errorf("Error reporting payment success to mc: %v", err)
×
570
        }
×
571

572
        // In case of success we atomically store settle result to the DB move
573
        // the shard to the settled state.
574
        htlcAttempt, err := p.router.cfg.Control.SettleAttempt(
14✔
575
                p.identifier, attempt.AttemptID,
14✔
576
                &channeldb.HTLCSettleInfo{
14✔
577
                        Preimage:   result.Preimage,
14✔
578
                        SettleTime: p.router.cfg.Clock.Now(),
14✔
579
                },
14✔
580
        )
14✔
581
        if err != nil {
15✔
582
                log.Errorf("Error settling attempt %v for payment %v with "+
1✔
583
                        "preimage %v: %v", attempt.AttemptID, p.identifier,
1✔
584
                        result.Preimage, err)
1✔
585

1✔
586
                // We won't mark the attempt as failed since we already have
1✔
587
                // the preimage.
1✔
588
                return nil, err
1✔
589
        }
1✔
590

591
        return &attemptResult{
13✔
592
                attempt: htlcAttempt,
13✔
593
        }, nil
13✔
594
}
595

596
// registerAttempt is responsible for creating and saving an HTLC attempt in db
597
// by using the route info provided. The `remainingAmt` is used to decide
598
// whether this is the last attempt.
599
func (p *paymentLifecycle) registerAttempt(rt *route.Route,
600
        remainingAmt lnwire.MilliSatoshi) (*channeldb.HTLCAttempt, error) {
36✔
601

36✔
602
        // If this route will consume the last remaining amount to send
36✔
603
        // to the receiver, this will be our last shard (for now).
36✔
604
        isLastAttempt := rt.ReceiverAmt() == remainingAmt
36✔
605

36✔
606
        // Using the route received from the payment session, create a new
36✔
607
        // shard to send.
36✔
608
        attempt, err := p.createNewPaymentAttempt(rt, isLastAttempt)
36✔
609
        if err != nil {
37✔
610
                return nil, err
1✔
611
        }
1✔
612

613
        // Before sending this HTLC to the switch, we checkpoint the fresh
614
        // paymentID and route to the DB. This lets us know on startup the ID
615
        // of the payment that we attempted to send, such that we can query the
616
        // Switch for its whereabouts. The route is needed to handle the result
617
        // when it eventually comes back.
618
        err = p.router.cfg.Control.RegisterAttempt(
35✔
619
                p.identifier, &attempt.HTLCAttemptInfo,
35✔
620
        )
35✔
621

35✔
622
        return attempt, err
35✔
623
}
624

625
// createNewPaymentAttempt creates a new payment attempt from the given route.
626
func (p *paymentLifecycle) createNewPaymentAttempt(rt *route.Route,
627
        lastShard bool) (*channeldb.HTLCAttempt, error) {
36✔
628

36✔
629
        // Generate a new key to be used for this attempt.
36✔
630
        sessionKey, err := generateNewSessionKey()
36✔
631
        if err != nil {
36✔
632
                return nil, err
×
633
        }
×
634

635
        // We generate a new, unique payment ID that we will use for
636
        // this HTLC.
637
        attemptID, err := p.router.cfg.NextPaymentID()
36✔
638
        if err != nil {
36✔
639
                return nil, err
×
640
        }
×
641

642
        // Request a new shard from the ShardTracker. If this is an AMP
643
        // payment, and this is the last shard, the outstanding shards together
644
        // with this one will be enough for the receiver to derive all HTLC
645
        // preimages. If this a non-AMP payment, the ShardTracker will return a
646
        // simple shard with the payment's static payment hash.
647
        shard, err := p.shardTracker.NewShard(attemptID, lastShard)
36✔
648
        if err != nil {
37✔
649
                return nil, err
1✔
650
        }
1✔
651

652
        // If this shard carries MPP or AMP options, add them to the last hop
653
        // on the route.
654
        hop := rt.Hops[len(rt.Hops)-1]
35✔
655
        if shard.MPP() != nil {
39✔
656
                hop.MPP = shard.MPP()
4✔
657
        }
4✔
658

659
        if shard.AMP() != nil {
35✔
660
                hop.AMP = shard.AMP()
×
661
        }
×
662

663
        hash := shard.Hash()
35✔
664

35✔
665
        // We now have all the information needed to populate the current
35✔
666
        // attempt information.
35✔
667
        attempt := channeldb.NewHtlcAttempt(
35✔
668
                attemptID, sessionKey, *rt, p.router.cfg.Clock.Now(), &hash,
35✔
669
        )
35✔
670

35✔
671
        return attempt, nil
35✔
672
}
673

674
// sendAttempt attempts to send the current attempt to the switch to complete
675
// the payment. If this attempt fails, then we'll continue on to the next
676
// available route.
677
func (p *paymentLifecycle) sendAttempt(
678
        attempt *channeldb.HTLCAttempt) (*attemptResult, error) {
35✔
679

35✔
680
        log.Debugf("Sending HTLC attempt(id=%v, total_amt=%v, first_hop_amt=%d"+
35✔
681
                ") for payment %v", attempt.AttemptID,
35✔
682
                attempt.Route.TotalAmount, attempt.Route.FirstHopAmount.Val,
35✔
683
                p.identifier)
35✔
684

35✔
685
        rt := attempt.Route
35✔
686

35✔
687
        // Construct the first hop.
35✔
688
        firstHop := lnwire.NewShortChanIDFromInt(rt.Hops[0].ChannelID)
35✔
689

35✔
690
        // Craft an HTLC packet to send to the htlcswitch. The metadata within
35✔
691
        // this packet will be used to route the payment through the network,
35✔
692
        // starting with the first-hop.
35✔
693
        htlcAdd := &lnwire.UpdateAddHTLC{
35✔
694
                Amount:        rt.FirstHopAmount.Val.Int(),
35✔
695
                Expiry:        rt.TotalTimeLock,
35✔
696
                PaymentHash:   *attempt.Hash,
35✔
697
                CustomRecords: rt.FirstHopWireCustomRecords,
35✔
698
        }
35✔
699

35✔
700
        // Generate the raw encoded sphinx packet to be included along
35✔
701
        // with the htlcAdd message that we send directly to the
35✔
702
        // switch.
35✔
703
        onionBlob, _, err := generateSphinxPacket(
35✔
704
                &rt, attempt.Hash[:], attempt.SessionKey(),
35✔
705
        )
35✔
706
        if err != nil {
36✔
707
                log.Errorf("Failed to create onion blob: attempt=%d in "+
1✔
708
                        "payment=%v, err:%v", attempt.AttemptID,
1✔
709
                        p.identifier, err)
1✔
710

1✔
711
                return p.failAttempt(attempt.AttemptID, err)
1✔
712
        }
1✔
713

714
        copy(htlcAdd.OnionBlob[:], onionBlob)
34✔
715

34✔
716
        // Send it to the Switch. When this method returns we assume
34✔
717
        // the Switch successfully has persisted the payment attempt,
34✔
718
        // such that we can resume waiting for the result after a
34✔
719
        // restart.
34✔
720
        err = p.router.cfg.Payer.SendHTLC(firstHop, attempt.AttemptID, htlcAdd)
34✔
721
        if err != nil {
39✔
722
                log.Errorf("Failed sending attempt %d for payment %v to "+
5✔
723
                        "switch: %v", attempt.AttemptID, p.identifier, err)
5✔
724

5✔
725
                return p.handleSwitchErr(attempt, err)
5✔
726
        }
5✔
727

728
        log.Debugf("Attempt %v for payment %v successfully sent to switch, "+
29✔
729
                "route: %v", attempt.AttemptID, p.identifier, &attempt.Route)
29✔
730

29✔
731
        return &attemptResult{
29✔
732
                attempt: attempt,
29✔
733
        }, nil
29✔
734
}
735

736
// amendFirstHopData is a function that calls the traffic shaper to allow it to
737
// add custom records to the outgoing HTLC and also adjust the amount if
738
// needed.
739
func (p *paymentLifecycle) amendFirstHopData(rt *route.Route) error {
36✔
740
        // The first hop amount on the route is the full route amount if not
36✔
741
        // overwritten by the traffic shaper. So we set the initial value now
36✔
742
        // and potentially overwrite it later.
36✔
743
        rt.FirstHopAmount = tlv.NewRecordT[tlv.TlvType0](
36✔
744
                tlv.NewBigSizeT(rt.TotalAmount),
36✔
745
        )
36✔
746

36✔
747
        // By default, we set the first hop custom records to the initial
36✔
748
        // value requested by the RPC. The traffic shaper may overwrite this
36✔
749
        // value.
36✔
750
        rt.FirstHopWireCustomRecords = p.firstHopCustomRecords
36✔
751

36✔
752
        // extraDataRequest is a helper struct to pass the custom records and
36✔
753
        // amount back from the traffic shaper.
36✔
754
        type extraDataRequest struct {
36✔
755
                customRecords fn.Option[lnwire.CustomRecords]
36✔
756

36✔
757
                amount fn.Option[lnwire.MilliSatoshi]
36✔
758
        }
36✔
759

36✔
760
        // If a hook exists that may affect our outgoing message, we call it now
36✔
761
        // and apply its side effects to the UpdateAddHTLC message.
36✔
762
        result, err := fn.MapOptionZ(
36✔
763
                p.router.cfg.TrafficShaper,
36✔
764
                //nolint:ll
36✔
765
                func(ts htlcswitch.AuxTrafficShaper) fn.Result[extraDataRequest] {
72✔
766
                        newAmt, newRecords, err := ts.ProduceHtlcExtraData(
36✔
767
                                rt.TotalAmount, p.firstHopCustomRecords,
36✔
768
                        )
36✔
769
                        if err != nil {
36✔
770
                                return fn.Err[extraDataRequest](err)
×
771
                        }
×
772

773
                        // Make sure we only received valid records.
774
                        if err := newRecords.Validate(); err != nil {
36✔
775
                                return fn.Err[extraDataRequest](err)
×
776
                        }
×
777

778
                        log.Debugf("Aux traffic shaper returned custom "+
36✔
779
                                "records %v and amount %d msat for HTLC",
36✔
780
                                spew.Sdump(newRecords), newAmt)
36✔
781

36✔
782
                        return fn.Ok(extraDataRequest{
36✔
783
                                customRecords: fn.Some(newRecords),
36✔
784
                                amount:        fn.Some(newAmt),
36✔
785
                        })
36✔
786
                },
787
        ).Unpack()
788
        if err != nil {
36✔
789
                return fmt.Errorf("traffic shaper failed to produce extra "+
×
790
                        "data: %w", err)
×
791
        }
×
792

793
        // Apply the side effects to the UpdateAddHTLC message.
794
        result.customRecords.WhenSome(func(records lnwire.CustomRecords) {
72✔
795
                rt.FirstHopWireCustomRecords = records
36✔
796
        })
36✔
797
        result.amount.WhenSome(func(amount lnwire.MilliSatoshi) {
72✔
798
                rt.FirstHopAmount = tlv.NewRecordT[tlv.TlvType0](
36✔
799
                        tlv.NewBigSizeT(amount),
36✔
800
                )
36✔
801
        })
36✔
802

803
        return nil
36✔
804
}
805

806
// failAttemptAndPayment fails both the payment and its attempt via the
807
// router's control tower, which marks the payment as failed in db.
808
func (p *paymentLifecycle) failPaymentAndAttempt(
809
        attemptID uint64, reason *channeldb.FailureReason,
810
        sendErr error) (*attemptResult, error) {
5✔
811

5✔
812
        log.Errorf("Payment %v failed: final_outcome=%v, raw_err=%v",
5✔
813
                p.identifier, *reason, sendErr)
5✔
814

5✔
815
        // Fail the payment via control tower.
5✔
816
        //
5✔
817
        // NOTE: we must fail the payment first before failing the attempt.
5✔
818
        // Otherwise, once the attempt is marked as failed, another goroutine
5✔
819
        // might make another attempt while we are failing the payment.
5✔
820
        err := p.router.cfg.Control.FailPayment(p.identifier, *reason)
5✔
821
        if err != nil {
5✔
822
                log.Errorf("Unable to fail payment: %v", err)
×
823
                return nil, err
×
824
        }
×
825

826
        // Fail the attempt.
827
        return p.failAttempt(attemptID, sendErr)
5✔
828
}
829

830
// handleSwitchErr inspects the given error from the Switch and determines
831
// whether we should make another payment attempt, or if it should be
832
// considered a terminal error. Terminal errors will be recorded with the
833
// control tower. It analyzes the sendErr for the payment attempt received from
834
// the switch and updates mission control and/or channel policies. Depending on
835
// the error type, the error is either the final outcome of the payment or we
836
// need to continue with an alternative route. A final outcome is indicated by
837
// a non-nil reason value.
838
func (p *paymentLifecycle) handleSwitchErr(attempt *channeldb.HTLCAttempt,
839
        sendErr error) (*attemptResult, error) {
22✔
840

22✔
841
        internalErrorReason := channeldb.FailureReasonError
22✔
842
        attemptID := attempt.AttemptID
22✔
843

22✔
844
        // reportAndFail is a helper closure that reports the failure to the
22✔
845
        // mission control, which helps us to decide whether we want to retry
22✔
846
        // the payment or not. If a non nil reason is returned from mission
22✔
847
        // control, it will further fail the payment via control tower.
22✔
848
        reportAndFail := func(srcIdx *int,
22✔
849
                msg lnwire.FailureMessage) (*attemptResult, error) {
41✔
850

19✔
851
                // Report outcome to mission control.
19✔
852
                reason, err := p.router.cfg.MissionControl.ReportPaymentFail(
19✔
853
                        attemptID, &attempt.Route, srcIdx, msg,
19✔
854
                )
19✔
855
                if err != nil {
19✔
856
                        log.Errorf("Error reporting payment result to mc: %v",
×
857
                                err)
×
858

×
859
                        reason = &internalErrorReason
×
860
                }
×
861

862
                // Fail the attempt only if there's no reason.
863
                if reason == nil {
36✔
864
                        // Fail the attempt.
17✔
865
                        return p.failAttempt(attemptID, sendErr)
17✔
866
                }
17✔
867

868
                // Otherwise fail both the payment and the attempt.
869
                return p.failPaymentAndAttempt(attemptID, reason, sendErr)
2✔
870
        }
871

872
        // If this attempt ID is unknown to the Switch, it means it was never
873
        // checkpointed and forwarded by the switch before a restart. In this
874
        // case we can safely send a new payment attempt, and wait for its
875
        // result to be available.
876
        if errors.Is(sendErr, htlcswitch.ErrPaymentIDNotFound) {
22✔
877
                log.Debugf("Attempt ID %v for payment %v not found in the "+
×
878
                        "Switch, retrying.", attempt.AttemptID, p.identifier)
×
879

×
880
                return p.failAttempt(attemptID, sendErr)
×
881
        }
×
882

883
        if errors.Is(sendErr, htlcswitch.ErrUnreadableFailureMessage) {
23✔
884
                log.Warn("Unreadable failure when sending htlc: id=%v, hash=%v",
1✔
885
                        attempt.AttemptID, attempt.Hash)
1✔
886

1✔
887
                // Since this error message cannot be decrypted, we will send a
1✔
888
                // nil error message to our mission controller and fail the
1✔
889
                // payment.
1✔
890
                return reportAndFail(nil, nil)
1✔
891
        }
1✔
892

893
        // If the error is a ClearTextError, we have received a valid wire
894
        // failure message, either from our own outgoing link or from a node
895
        // down the route. If the error is not related to the propagation of
896
        // our payment, we can stop trying because an internal error has
897
        // occurred.
898
        var rtErr htlcswitch.ClearTextError
21✔
899
        ok := errors.As(sendErr, &rtErr)
21✔
900
        if !ok {
24✔
901
                return p.failPaymentAndAttempt(
3✔
902
                        attemptID, &internalErrorReason, sendErr,
3✔
903
                )
3✔
904
        }
3✔
905

906
        // failureSourceIdx is the index of the node that the failure occurred
907
        // at. If the ClearTextError received is not a ForwardingError the
908
        // payment error occurred at our node, so we leave this value as 0
909
        // to indicate that the failure occurred locally. If the error is a
910
        // ForwardingError, it did not originate at our node, so we set
911
        // failureSourceIdx to the index of the node where the failure occurred.
912
        failureSourceIdx := 0
18✔
913
        var source *htlcswitch.ForwardingError
18✔
914
        ok = errors.As(rtErr, &source)
18✔
915
        if ok {
36✔
916
                failureSourceIdx = source.FailureSourceIdx
18✔
917
        }
18✔
918

919
        // Extract the wire failure and apply channel update if it contains one.
920
        // If we received an unknown failure message from a node along the
921
        // route, the failure message will be nil.
922
        failureMessage := rtErr.WireMessage()
18✔
923
        err := p.handleFailureMessage(
18✔
924
                &attempt.Route, failureSourceIdx, failureMessage,
18✔
925
        )
18✔
926
        if err != nil {
18✔
927
                return p.failPaymentAndAttempt(
×
928
                        attemptID, &internalErrorReason, sendErr,
×
929
                )
×
930
        }
×
931

932
        log.Tracef("Node=%v reported failure when sending htlc",
18✔
933
                failureSourceIdx)
18✔
934

18✔
935
        return reportAndFail(&failureSourceIdx, failureMessage)
18✔
936
}
937

938
// handleFailureMessage tries to apply a channel update present in the failure
939
// message if any.
940
func (p *paymentLifecycle) handleFailureMessage(rt *route.Route,
941
        errorSourceIdx int, failure lnwire.FailureMessage) error {
18✔
942

18✔
943
        if failure == nil {
19✔
944
                return nil
1✔
945
        }
1✔
946

947
        // It makes no sense to apply our own channel updates.
948
        if errorSourceIdx == 0 {
17✔
949
                log.Errorf("Channel update of ourselves received")
×
950

×
951
                return nil
×
952
        }
×
953

954
        // Extract channel update if the error contains one.
955
        update := p.router.extractChannelUpdate(failure)
17✔
956
        if update == nil {
26✔
957
                return nil
9✔
958
        }
9✔
959

960
        // Parse pubkey to allow validation of the channel update. This should
961
        // always succeed, otherwise there is something wrong in our
962
        // implementation. Therefore, return an error.
963
        errVertex := rt.Hops[errorSourceIdx-1].PubKeyBytes
8✔
964
        errSource, err := btcec.ParsePubKey(errVertex[:])
8✔
965
        if err != nil {
8✔
966
                log.Errorf("Cannot parse pubkey: idx=%v, pubkey=%v",
×
967
                        errorSourceIdx, errVertex)
×
968

×
969
                return err
×
970
        }
×
971

972
        var (
8✔
973
                isAdditionalEdge bool
8✔
974
                policy           *models.CachedEdgePolicy
8✔
975
        )
8✔
976

8✔
977
        // Before we apply the channel update, we need to decide whether the
8✔
978
        // update is for additional (ephemeral) edge or normal edge stored in
8✔
979
        // db.
8✔
980
        //
8✔
981
        // Note: the p.paySession might be nil here if it's called inside
8✔
982
        // SendToRoute where there's no payment lifecycle.
8✔
983
        if p.paySession != nil {
13✔
984
                policy = p.paySession.GetAdditionalEdgePolicy(
5✔
985
                        errSource, update.ShortChannelID.ToUint64(),
5✔
986
                )
5✔
987
                if policy != nil {
7✔
988
                        isAdditionalEdge = true
2✔
989
                }
2✔
990
        }
991

992
        // Apply channel update to additional edge policy.
993
        if isAdditionalEdge {
10✔
994
                if !p.paySession.UpdateAdditionalEdge(
2✔
995
                        update, errSource, policy) {
2✔
996

×
997
                        log.Debugf("Invalid channel update received: node=%v",
×
998
                                errVertex)
×
999
                }
×
1000
                return nil
2✔
1001
        }
1002

1003
        // Apply channel update to the channel edge policy in our db.
1004
        if !p.router.cfg.ApplyChannelUpdate(update) {
8✔
1005
                log.Debugf("Invalid channel update received: node=%v",
2✔
1006
                        errVertex)
2✔
1007
        }
2✔
1008
        return nil
6✔
1009
}
1010

1011
// failAttempt calls control tower to fail the current payment attempt.
1012
func (p *paymentLifecycle) failAttempt(attemptID uint64,
1013
        sendError error) (*attemptResult, error) {
23✔
1014

23✔
1015
        log.Warnf("Attempt %v for payment %v failed: %v", attemptID,
23✔
1016
                p.identifier, sendError)
23✔
1017

23✔
1018
        failInfo := marshallError(
23✔
1019
                sendError,
23✔
1020
                p.router.cfg.Clock.Now(),
23✔
1021
        )
23✔
1022

23✔
1023
        // Now that we are failing this payment attempt, cancel the shard with
23✔
1024
        // the ShardTracker such that it can derive the correct hash for the
23✔
1025
        // next attempt.
23✔
1026
        if err := p.shardTracker.CancelShard(attemptID); err != nil {
23✔
1027
                return nil, err
×
1028
        }
×
1029

1030
        attempt, err := p.router.cfg.Control.FailAttempt(
23✔
1031
                p.identifier, attemptID, failInfo,
23✔
1032
        )
23✔
1033
        if err != nil {
26✔
1034
                return nil, err
3✔
1035
        }
3✔
1036

1037
        return &attemptResult{
20✔
1038
                attempt: attempt,
20✔
1039
                err:     sendError,
20✔
1040
        }, nil
20✔
1041
}
1042

1043
// marshallError marshall an error as received from the switch to a structure
1044
// that is suitable for database storage.
1045
func marshallError(sendError error, time time.Time) *channeldb.HTLCFailInfo {
23✔
1046
        response := &channeldb.HTLCFailInfo{
23✔
1047
                FailTime: time,
23✔
1048
        }
23✔
1049

23✔
1050
        switch {
23✔
1051
        case errors.Is(sendError, htlcswitch.ErrPaymentIDNotFound):
×
1052
                response.Reason = channeldb.HTLCFailInternal
×
1053
                return response
×
1054

1055
        case errors.Is(sendError, htlcswitch.ErrUnreadableFailureMessage):
1✔
1056
                response.Reason = channeldb.HTLCFailUnreadable
1✔
1057
                return response
1✔
1058
        }
1059

1060
        var rtErr htlcswitch.ClearTextError
22✔
1061
        ok := errors.As(sendError, &rtErr)
22✔
1062
        if !ok {
26✔
1063
                response.Reason = channeldb.HTLCFailInternal
4✔
1064
                return response
4✔
1065
        }
4✔
1066

1067
        message := rtErr.WireMessage()
18✔
1068
        if message != nil {
35✔
1069
                response.Reason = channeldb.HTLCFailMessage
17✔
1070
                response.Message = message
17✔
1071
        } else {
18✔
1072
                response.Reason = channeldb.HTLCFailUnknown
1✔
1073
        }
1✔
1074

1075
        // If the ClearTextError received is a ForwardingError, the error
1076
        // originated from a node along the route, not locally on our outgoing
1077
        // link. We set failureSourceIdx to the index of the node where the
1078
        // failure occurred. If the error is not a ForwardingError, the failure
1079
        // occurred at our node, so we leave the index as 0 to indicate that
1080
        // we failed locally.
1081
        var fErr *htlcswitch.ForwardingError
18✔
1082
        ok = errors.As(rtErr, &fErr)
18✔
1083
        if ok {
36✔
1084
                response.FailureSourceIndex = uint32(fErr.FailureSourceIdx)
18✔
1085
        }
18✔
1086

1087
        return response
18✔
1088
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc