• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12430728843

20 Dec 2024 11:36AM UTC coverage: 61.336% (+2.6%) from 58.716%
12430728843

Pull #8777

github

ziggie1984
channeldb: fix typo.
Pull Request #8777: multi: make reassignment of alias channel edge atomic

161 of 213 new or added lines in 7 files covered. (75.59%)

70 existing lines in 17 files now uncovered.

23369 of 38100 relevant lines covered (61.34%)

115813.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.95
/routing/payment_lifecycle.go
1
package routing
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "time"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/davecgh/go-spew/spew"
11
        sphinx "github.com/lightningnetwork/lightning-onion"
12
        "github.com/lightningnetwork/lnd/channeldb"
13
        "github.com/lightningnetwork/lnd/fn/v2"
14
        "github.com/lightningnetwork/lnd/graph/db/models"
15
        "github.com/lightningnetwork/lnd/htlcswitch"
16
        "github.com/lightningnetwork/lnd/lntypes"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/routing/route"
19
        "github.com/lightningnetwork/lnd/routing/shards"
20
        "github.com/lightningnetwork/lnd/tlv"
21
)
22

23
// ErrPaymentLifecycleExiting is used when waiting for htlc attempt result, but
24
// the payment lifecycle is exiting .
25
var ErrPaymentLifecycleExiting = errors.New("payment lifecycle exiting")
26

27
// paymentLifecycle holds all information about the current state of a payment
28
// needed to resume if from any point.
29
type paymentLifecycle struct {
30
        router                *ChannelRouter
31
        feeLimit              lnwire.MilliSatoshi
32
        identifier            lntypes.Hash
33
        paySession            PaymentSession
34
        shardTracker          shards.ShardTracker
35
        currentHeight         int32
36
        firstHopCustomRecords lnwire.CustomRecords
37

38
        // quit is closed to signal the sub goroutines of the payment lifecycle
39
        // to stop.
40
        quit chan struct{}
41

42
        // resultCollected is used to signal that the result of an attempt has
43
        // been collected. A nil error means the attempt is either successful
44
        // or failed with temporary error. Otherwise, we should exit the
45
        // lifecycle loop as a terminal error has occurred.
46
        resultCollected chan error
47

48
        // resultCollector is a function that is used to collect the result of
49
        // an HTLC attempt, which is always mounted to `p.collectResultAsync`
50
        // except in unit test, where we use a much simpler resultCollector to
51
        // decouple the test flow for the payment lifecycle.
52
        resultCollector func(attempt *channeldb.HTLCAttempt)
53
}
54

55
// newPaymentLifecycle initiates a new payment lifecycle and returns it.
56
func newPaymentLifecycle(r *ChannelRouter, feeLimit lnwire.MilliSatoshi,
57
        identifier lntypes.Hash, paySession PaymentSession,
58
        shardTracker shards.ShardTracker, currentHeight int32,
59
        firstHopCustomRecords lnwire.CustomRecords) *paymentLifecycle {
44✔
60

44✔
61
        p := &paymentLifecycle{
44✔
62
                router:                r,
44✔
63
                feeLimit:              feeLimit,
44✔
64
                identifier:            identifier,
44✔
65
                paySession:            paySession,
44✔
66
                shardTracker:          shardTracker,
44✔
67
                currentHeight:         currentHeight,
44✔
68
                quit:                  make(chan struct{}),
44✔
69
                resultCollected:       make(chan error, 1),
44✔
70
                firstHopCustomRecords: firstHopCustomRecords,
44✔
71
        }
44✔
72

44✔
73
        // Mount the result collector.
44✔
74
        p.resultCollector = p.collectResultAsync
44✔
75

44✔
76
        return p
44✔
77
}
44✔
78

79
// calcFeeBudget returns the available fee to be used for sending HTLC
80
// attempts.
81
func (p *paymentLifecycle) calcFeeBudget(
82
        feesPaid lnwire.MilliSatoshi) lnwire.MilliSatoshi {
107✔
83

107✔
84
        budget := p.feeLimit
107✔
85

107✔
86
        // We'll subtract the used fee from our fee budget. In case of
107✔
87
        // overflow, we need to check whether feesPaid exceeds our budget
107✔
88
        // already.
107✔
89
        if feesPaid <= budget {
214✔
90
                budget -= feesPaid
107✔
91
        } else {
111✔
92
                budget = 0
4✔
93
        }
4✔
94

95
        return budget
107✔
96
}
97

98
// stateStep defines an action to be taken in our payment lifecycle. We either
99
// quit, continue, or exit the lifecycle, see details below.
100
type stateStep uint8
101

102
const (
103
        // stepSkip is used when we need to skip the current lifecycle and jump
104
        // to the next one.
105
        stepSkip stateStep = iota
106

107
        // stepProceed is used when we can proceed the current lifecycle.
108
        stepProceed
109

110
        // stepExit is used when we need to quit the current lifecycle.
111
        stepExit
112
)
113

114
// decideNextStep is used to determine the next step in the payment lifecycle.
115
func (p *paymentLifecycle) decideNextStep(
116
        payment DBMPPayment) (stateStep, error) {
78✔
117

78✔
118
        // Check whether we could make new HTLC attempts.
78✔
119
        allow, err := payment.AllowMoreAttempts()
78✔
120
        if err != nil {
80✔
121
                return stepExit, err
2✔
122
        }
2✔
123

124
        if !allow {
121✔
125
                // Check whether we need to wait for results.
45✔
126
                wait, err := payment.NeedWaitAttempts()
45✔
127
                if err != nil {
46✔
128
                        return stepExit, err
1✔
129
                }
1✔
130

131
                // If we are not allowed to make new HTLC attempts and there's
132
                // no need to wait, the lifecycle is done and we can exit.
133
                if !wait {
65✔
134
                        return stepExit, nil
21✔
135
                }
21✔
136

137
                log.Tracef("Waiting for attempt results for payment %v",
27✔
138
                        p.identifier)
27✔
139

27✔
140
                // Otherwise we wait for one HTLC attempt then continue
27✔
141
                // the lifecycle.
27✔
142
                //
27✔
143
                // NOTE: we don't check `p.quit` since `decideNextStep` is
27✔
144
                // running in the same goroutine as `resumePayment`.
27✔
145
                select {
27✔
146
                case err := <-p.resultCollected:
26✔
147
                        // If an error is returned, exit with it.
26✔
148
                        if err != nil {
30✔
149
                                return stepExit, err
4✔
150
                        }
4✔
151

152
                        log.Tracef("Received attempt result for payment %v",
26✔
153
                                p.identifier)
26✔
154

155
                case <-p.router.quit:
1✔
156
                        return stepExit, ErrRouterShuttingDown
1✔
157
                }
158

159
                return stepSkip, nil
26✔
160
        }
161

162
        // Otherwise we need to make more attempts.
163
        return stepProceed, nil
35✔
164
}
165

166
// resumePayment resumes the paymentLifecycle from the current state.
167
func (p *paymentLifecycle) resumePayment(ctx context.Context) ([32]byte,
168
        *route.Route, error) {
26✔
169

26✔
170
        // When the payment lifecycle loop exits, we make sure to signal any
26✔
171
        // sub goroutine of the HTLC attempt to exit, then wait for them to
26✔
172
        // return.
26✔
173
        defer p.stop()
26✔
174

26✔
175
        // If we had any existing attempts outstanding, we'll start by spinning
26✔
176
        // up goroutines that'll collect their results and deliver them to the
26✔
177
        // lifecycle loop below.
26✔
178
        payment, err := p.router.cfg.Control.FetchPayment(p.identifier)
26✔
179
        if err != nil {
27✔
180
                return [32]byte{}, nil, err
1✔
181
        }
1✔
182

183
        for _, a := range payment.InFlightHTLCs() {
29✔
184
                a := a
4✔
185

4✔
186
                log.Infof("Resuming HTLC attempt %v for payment %v",
4✔
187
                        a.AttemptID, p.identifier)
4✔
188

4✔
189
                p.resultCollector(&a)
4✔
190
        }
4✔
191

192
        // Get the payment status.
193
        status := payment.GetStatus()
25✔
194

25✔
195
        // exitWithErr is a helper closure that logs and returns an error.
25✔
196
        exitWithErr := func(err error) ([32]byte, *route.Route, error) {
34✔
197
                // Log an error with the latest payment status.
9✔
198
                //
9✔
199
                // NOTE: this `status` variable is reassigned in the loop
9✔
200
                // below. We could also call `payment.GetStatus` here, but in a
9✔
201
                // rare case when the critical log is triggered when using
9✔
202
                // postgres as db backend, the `payment` could be nil, causing
9✔
203
                // the payment fetching to return an error.
9✔
204
                log.Errorf("Payment %v with status=%v failed: %v", p.identifier,
9✔
205
                        status, err)
9✔
206

9✔
207
                return [32]byte{}, nil, err
9✔
208
        }
9✔
209

210
        // We'll continue until either our payment succeeds, or we encounter a
211
        // critical error during path finding.
212
lifecycle:
25✔
213
        for {
98✔
214
                // We update the payment state on every iteration. Since the
73✔
215
                // payment state is affected by multiple goroutines (ie,
73✔
216
                // collectResultAsync), it is NOT guaranteed that we always
73✔
217
                // have the latest state here. This is fine as long as the
73✔
218
                // state is consistent as a whole.
73✔
219
                payment, err = p.router.cfg.Control.FetchPayment(p.identifier)
73✔
220
                if err != nil {
73✔
221
                        return exitWithErr(err)
×
222
                }
×
223

224
                ps := payment.GetState()
73✔
225
                remainingFees := p.calcFeeBudget(ps.FeesPaid)
73✔
226

73✔
227
                status = payment.GetStatus()
73✔
228
                log.Debugf("Payment %v: status=%v, active_shards=%v, "+
73✔
229
                        "rem_value=%v, fee_limit=%v", p.identifier, status,
73✔
230
                        ps.NumAttemptsInFlight, ps.RemainingAmt, remainingFees)
73✔
231

73✔
232
                // We now proceed our lifecycle with the following tasks in
73✔
233
                // order,
73✔
234
                //   1. check context.
73✔
235
                //   2. request route.
73✔
236
                //   3. create HTLC attempt.
73✔
237
                //   4. send HTLC attempt.
73✔
238
                //   5. collect HTLC attempt result.
73✔
239
                //
73✔
240
                // Before we attempt any new shard, we'll check to see if we've
73✔
241
                // gone past the payment attempt timeout, or if the context was
73✔
242
                // cancelled, or the router is exiting. In any of these cases,
73✔
243
                // we'll stop this payment attempt short.
73✔
244
                if err := p.checkContext(ctx); err != nil {
74✔
245
                        return exitWithErr(err)
1✔
246
                }
1✔
247

248
                // Now decide the next step of the current lifecycle.
249
                step, err := p.decideNextStep(payment)
72✔
250
                if err != nil {
77✔
251
                        return exitWithErr(err)
5✔
252
                }
5✔
253

254
                switch step {
71✔
255
                // Exit the for loop and return below.
256
                case stepExit:
20✔
257
                        break lifecycle
20✔
258

259
                // Continue the for loop and skip the rest.
260
                case stepSkip:
25✔
261
                        continue lifecycle
25✔
262

263
                // Continue the for loop and proceed the rest.
264
                case stepProceed:
34✔
265

266
                // Unknown step received, exit with an error.
267
                default:
×
268
                        err = fmt.Errorf("unknown step: %v", step)
×
269
                        return exitWithErr(err)
×
270
                }
271

272
                // Now request a route to be used to create our HTLC attempt.
273
                rt, err := p.requestRoute(ps)
34✔
274
                if err != nil {
35✔
275
                        return exitWithErr(err)
1✔
276
                }
1✔
277

278
                // We may not be able to find a route for current attempt. In
279
                // that case, we continue the loop and move straight to the
280
                // next iteration in case there are results for inflight HTLCs
281
                // that still need to be collected.
282
                if rt == nil {
39✔
283
                        log.Errorf("No route found for payment %v",
6✔
284
                                p.identifier)
6✔
285

6✔
286
                        continue lifecycle
6✔
287
                }
288

289
                log.Tracef("Found route: %s", spew.Sdump(rt.Hops))
31✔
290

31✔
291
                // Allow the traffic shaper to add custom records to the
31✔
292
                // outgoing HTLC and also adjust the amount if needed.
31✔
293
                err = p.amendFirstHopData(rt)
31✔
294
                if err != nil {
31✔
295
                        return exitWithErr(err)
×
296
                }
×
297

298
                // We found a route to try, create a new HTLC attempt to try.
299
                attempt, err := p.registerAttempt(rt, ps.RemainingAmt)
31✔
300
                if err != nil {
32✔
301
                        return exitWithErr(err)
1✔
302
                }
1✔
303

304
                // Once the attempt is created, send it to the htlcswitch.
305
                result, err := p.sendAttempt(attempt)
30✔
306
                if err != nil {
31✔
307
                        return exitWithErr(err)
1✔
308
                }
1✔
309

310
                // Now that the shard was successfully sent, launch a go
311
                // routine that will handle its result when its back.
312
                if result.err == nil {
57✔
313
                        p.resultCollector(attempt)
28✔
314
                }
28✔
315
        }
316

317
        // Once we are out the lifecycle loop, it means we've reached a
318
        // terminal condition. We either return the settled preimage or the
319
        // payment's failure reason.
320
        //
321
        // Optionally delete the failed attempts from the database.
322
        err = p.router.cfg.Control.DeleteFailedAttempts(p.identifier)
20✔
323
        if err != nil {
20✔
324
                log.Errorf("Error deleting failed htlc attempts for payment "+
×
325
                        "%v: %v", p.identifier, err)
×
326
        }
×
327

328
        htlc, failure := payment.TerminalInfo()
20✔
329
        if htlc != nil {
36✔
330
                return htlc.Settle.Preimage, &htlc.Route, nil
16✔
331
        }
16✔
332

333
        // Otherwise return the payment failure reason.
334
        return [32]byte{}, nil, *failure
8✔
335
}
336

337
// checkContext checks whether the payment context has been canceled.
338
// Cancellation occurs manually or if the context times out.
339
func (p *paymentLifecycle) checkContext(ctx context.Context) error {
76✔
340
        select {
76✔
341
        case <-ctx.Done():
8✔
342
                // If the context was canceled, we'll mark the payment as
8✔
343
                // failed. There are two cases to distinguish here: Either a
8✔
344
                // user-provided timeout was reached, or the context was
8✔
345
                // canceled, either to a manual cancellation or due to an
8✔
346
                // unknown error.
8✔
347
                var reason channeldb.FailureReason
8✔
348
                if errors.Is(ctx.Err(), context.DeadlineExceeded) {
11✔
349
                        reason = channeldb.FailureReasonTimeout
3✔
350
                        log.Warnf("Payment attempt not completed before "+
3✔
351
                                "timeout, id=%s", p.identifier.String())
3✔
352
                } else {
8✔
353
                        reason = channeldb.FailureReasonCanceled
5✔
354
                        log.Warnf("Payment attempt context canceled, id=%s",
5✔
355
                                p.identifier.String())
5✔
356
                }
5✔
357

358
                // By marking the payment failed, depending on whether it has
359
                // inflight HTLCs or not, its status will now either be
360
                // `StatusInflight` or `StatusFailed`. In either case, no more
361
                // HTLCs will be attempted.
362
                err := p.router.cfg.Control.FailPayment(p.identifier, reason)
8✔
363
                if err != nil {
9✔
364
                        return fmt.Errorf("FailPayment got %w", err)
1✔
365
                }
1✔
366

367
        case <-p.router.quit:
2✔
368
                return fmt.Errorf("check payment timeout got: %w",
2✔
369
                        ErrRouterShuttingDown)
2✔
370

371
        // Fall through if we haven't hit our time limit.
372
        default:
70✔
373
        }
374

375
        return nil
73✔
376
}
377

378
// requestRoute is responsible for finding a route to be used to create an HTLC
379
// attempt.
380
func (p *paymentLifecycle) requestRoute(
381
        ps *channeldb.MPPaymentState) (*route.Route, error) {
38✔
382

38✔
383
        remainingFees := p.calcFeeBudget(ps.FeesPaid)
38✔
384

38✔
385
        // Query our payment session to construct a route.
38✔
386
        rt, err := p.paySession.RequestRoute(
38✔
387
                ps.RemainingAmt, remainingFees,
38✔
388
                uint32(ps.NumAttemptsInFlight), uint32(p.currentHeight),
38✔
389
                p.firstHopCustomRecords,
38✔
390
        )
38✔
391

38✔
392
        // Exit early if there's no error.
38✔
393
        if err == nil {
70✔
394
                return rt, nil
32✔
395
        }
32✔
396

397
        // Otherwise we need to handle the error.
398
        log.Warnf("Failed to find route for payment %v: %v", p.identifier, err)
10✔
399

10✔
400
        // If the error belongs to `noRouteError` set, it means a non-critical
10✔
401
        // error has happened during path finding, and we will mark the payment
10✔
402
        // failed with this reason. Otherwise, we'll return the critical error
10✔
403
        // found to abort the lifecycle.
10✔
404
        var routeErr noRouteError
10✔
405
        if !errors.As(err, &routeErr) {
12✔
406
                return nil, fmt.Errorf("requestRoute got: %w", err)
2✔
407
        }
2✔
408

409
        // It's the `paymentSession`'s responsibility to find a route for us
410
        // with the best effort. When it cannot find a path, we need to treat it
411
        // as a terminal condition and fail the payment no matter it has
412
        // inflight HTLCs or not.
413
        failureCode := routeErr.FailureReason()
8✔
414
        log.Warnf("Marking payment %v permanently failed with no route: %v",
8✔
415
                p.identifier, failureCode)
8✔
416

8✔
417
        err = p.router.cfg.Control.FailPayment(p.identifier, failureCode)
8✔
418
        if err != nil {
9✔
419
                return nil, fmt.Errorf("FailPayment got: %w", err)
1✔
420
        }
1✔
421

422
        // NOTE: we decide to not return the non-critical noRouteError here to
423
        // avoid terminating the payment lifecycle as there might be other
424
        // inflight HTLCs which we must wait for their results.
425
        return nil, nil
7✔
426
}
427

428
// stop signals any active shard goroutine to exit.
429
func (p *paymentLifecycle) stop() {
27✔
430
        close(p.quit)
27✔
431
}
27✔
432

433
// attemptResult holds the HTLC attempt and a possible error returned from
434
// sending it.
435
type attemptResult struct {
436
        // err is non-nil if a non-critical error was encountered when trying
437
        // to send the attempt, and we successfully updated the control tower
438
        // to reflect this error. This can be errors like not enough local
439
        // balance for the given route etc.
440
        err error
441

442
        // attempt is the attempt structure as recorded in the database.
443
        attempt *channeldb.HTLCAttempt
444
}
445

446
// collectResultAsync launches a goroutine that will wait for the result of the
447
// given HTLC attempt to be available then handle its result. Once received, it
448
// will send a nil error to channel `resultCollected` to indicate there's a
449
// result.
450
func (p *paymentLifecycle) collectResultAsync(attempt *channeldb.HTLCAttempt) {
26✔
451
        log.Debugf("Collecting result for attempt %v in payment %v",
26✔
452
                attempt.AttemptID, p.identifier)
26✔
453

26✔
454
        go func() {
52✔
455
                // Block until the result is available.
26✔
456
                _, err := p.collectResult(attempt)
26✔
457
                if err != nil {
30✔
458
                        log.Errorf("Error collecting result for attempt %v "+
4✔
459
                                "in payment %v: %v", attempt.AttemptID,
4✔
460
                                p.identifier, err)
4✔
461
                }
4✔
462

463
                log.Debugf("Result collected for attempt %v in payment %v",
26✔
464
                        attempt.AttemptID, p.identifier)
26✔
465

26✔
466
                // Once the result is collected, we signal it by writing the
26✔
467
                // error to `resultCollected`.
26✔
468
                select {
26✔
469
                // Send the signal or quit.
470
                case p.resultCollected <- err:
26✔
471

UNCOV
472
                case <-p.quit:
×
UNCOV
473
                        log.Debugf("Lifecycle exiting while collecting "+
×
UNCOV
474
                                "result for payment %v", p.identifier)
×
475

UNCOV
476
                case <-p.router.quit:
×
UNCOV
477
                        return
×
478
                }
479
        }()
480
}
481

482
// collectResult waits for the result for the given attempt to be available
483
// from the Switch, then records the attempt outcome with the control tower.
484
// An attemptResult is returned, indicating the final outcome of this HTLC
485
// attempt.
486
func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) (
487
        *attemptResult, error) {
38✔
488

38✔
489
        log.Tracef("Collecting result for attempt %v", spew.Sdump(attempt))
38✔
490

38✔
491
        // We'll retrieve the hash specific to this shard from the
38✔
492
        // shardTracker, since it will be needed to regenerate the circuit
38✔
493
        // below.
38✔
494
        hash, err := p.shardTracker.GetHash(attempt.AttemptID)
38✔
495
        if err != nil {
38✔
496
                return p.failAttempt(attempt.AttemptID, err)
×
497
        }
×
498

499
        // Regenerate the circuit for this attempt.
500
        _, circuit, err := generateSphinxPacket(
38✔
501
                &attempt.Route, hash[:], attempt.SessionKey(),
38✔
502
        )
38✔
503
        // TODO(yy): We generate this circuit to create the error decryptor,
38✔
504
        // which is then used in htlcswitch as the deobfuscator to decode the
38✔
505
        // error from `UpdateFailHTLC`. However, suppose it's an
38✔
506
        // `UpdateFulfillHTLC` message yet for some reason the sphinx packet is
38✔
507
        // failed to be generated, we'd miss settling it. This means we should
38✔
508
        // give it a second chance to try the settlement path in case
38✔
509
        // `GetAttemptResult` gives us back the preimage. And move the circuit
38✔
510
        // creation into htlcswitch so it's only constructed when there's a
38✔
511
        // failure message we need to decode.
38✔
512
        if err != nil {
38✔
513
                log.Debugf("Unable to generate circuit for attempt %v: %v",
×
514
                        attempt.AttemptID, err)
×
515

×
516
                return p.failAttempt(attempt.AttemptID, err)
×
517
        }
×
518

519
        // Using the created circuit, initialize the error decrypter, so we can
520
        // parse+decode any failures incurred by this payment within the
521
        // switch.
522
        errorDecryptor := &htlcswitch.SphinxErrorDecrypter{
38✔
523
                OnionErrorDecrypter: sphinx.NewOnionErrorDecrypter(circuit),
38✔
524
        }
38✔
525

38✔
526
        // Now ask the switch to return the result of the payment when
38✔
527
        // available.
38✔
528
        //
38✔
529
        // TODO(yy): consider using htlcswitch to create the `errorDecryptor`
38✔
530
        // since the htlc is already in db. This will also make the interface
38✔
531
        // `PaymentAttemptDispatcher` deeper and easier to use. Moreover, we'd
38✔
532
        // only create the decryptor when received a failure, further saving us
38✔
533
        // a few CPU cycles.
38✔
534
        resultChan, err := p.router.cfg.Payer.GetAttemptResult(
38✔
535
                attempt.AttemptID, p.identifier, errorDecryptor,
38✔
536
        )
38✔
537
        // Handle the switch error.
38✔
538
        if err != nil {
39✔
539
                log.Errorf("Failed getting result for attemptID %d "+
1✔
540
                        "from switch: %v", attempt.AttemptID, err)
1✔
541

1✔
542
                return p.handleSwitchErr(attempt, err)
1✔
543
        }
1✔
544

545
        // The switch knows about this payment, we'll wait for a result to be
546
        // available.
547
        var (
37✔
548
                result *htlcswitch.PaymentResult
37✔
549
                ok     bool
37✔
550
        )
37✔
551

37✔
552
        select {
37✔
553
        case result, ok = <-resultChan:
35✔
554
                if !ok {
40✔
555
                        return nil, htlcswitch.ErrSwitchExiting
5✔
556
                }
5✔
557

558
        case <-p.quit:
1✔
559
                return nil, ErrPaymentLifecycleExiting
1✔
560

561
        case <-p.router.quit:
1✔
562
                return nil, ErrRouterShuttingDown
1✔
563
        }
564

565
        // In case of a payment failure, fail the attempt with the control
566
        // tower and return.
567
        if result.Error != nil {
54✔
568
                return p.handleSwitchErr(attempt, result.Error)
20✔
569
        }
20✔
570

571
        // We successfully got a payment result back from the switch.
572
        log.Debugf("Payment %v succeeded with pid=%v",
18✔
573
                p.identifier, attempt.AttemptID)
18✔
574

18✔
575
        // Report success to mission control.
18✔
576
        err = p.router.cfg.MissionControl.ReportPaymentSuccess(
18✔
577
                attempt.AttemptID, &attempt.Route,
18✔
578
        )
18✔
579
        if err != nil {
18✔
580
                log.Errorf("Error reporting payment success to mc: %v", err)
×
581
        }
×
582

583
        // In case of success we atomically store settle result to the DB move
584
        // the shard to the settled state.
585
        htlcAttempt, err := p.router.cfg.Control.SettleAttempt(
18✔
586
                p.identifier, attempt.AttemptID,
18✔
587
                &channeldb.HTLCSettleInfo{
18✔
588
                        Preimage:   result.Preimage,
18✔
589
                        SettleTime: p.router.cfg.Clock.Now(),
18✔
590
                },
18✔
591
        )
18✔
592
        if err != nil {
19✔
593
                log.Errorf("Error settling attempt %v for payment %v with "+
1✔
594
                        "preimage %v: %v", attempt.AttemptID, p.identifier,
1✔
595
                        result.Preimage, err)
1✔
596

1✔
597
                // We won't mark the attempt as failed since we already have
1✔
598
                // the preimage.
1✔
599
                return nil, err
1✔
600
        }
1✔
601

602
        return &attemptResult{
17✔
603
                attempt: htlcAttempt,
17✔
604
        }, nil
17✔
605
}
606

607
// registerAttempt is responsible for creating and saving an HTLC attempt in db
608
// by using the route info provided. The `remainingAmt` is used to decide
609
// whether this is the last attempt.
610
func (p *paymentLifecycle) registerAttempt(rt *route.Route,
611
        remainingAmt lnwire.MilliSatoshi) (*channeldb.HTLCAttempt, error) {
40✔
612

40✔
613
        // If this route will consume the last remaining amount to send
40✔
614
        // to the receiver, this will be our last shard (for now).
40✔
615
        isLastAttempt := rt.ReceiverAmt() == remainingAmt
40✔
616

40✔
617
        // Using the route received from the payment session, create a new
40✔
618
        // shard to send.
40✔
619
        attempt, err := p.createNewPaymentAttempt(rt, isLastAttempt)
40✔
620
        if err != nil {
41✔
621
                return nil, err
1✔
622
        }
1✔
623

624
        // Before sending this HTLC to the switch, we checkpoint the fresh
625
        // paymentID and route to the DB. This lets us know on startup the ID
626
        // of the payment that we attempted to send, such that we can query the
627
        // Switch for its whereabouts. The route is needed to handle the result
628
        // when it eventually comes back.
629
        err = p.router.cfg.Control.RegisterAttempt(
39✔
630
                p.identifier, &attempt.HTLCAttemptInfo,
39✔
631
        )
39✔
632

39✔
633
        return attempt, err
39✔
634
}
635

636
// createNewPaymentAttempt creates a new payment attempt from the given route.
637
func (p *paymentLifecycle) createNewPaymentAttempt(rt *route.Route,
638
        lastShard bool) (*channeldb.HTLCAttempt, error) {
40✔
639

40✔
640
        // Generate a new key to be used for this attempt.
40✔
641
        sessionKey, err := generateNewSessionKey()
40✔
642
        if err != nil {
40✔
643
                return nil, err
×
644
        }
×
645

646
        // We generate a new, unique payment ID that we will use for
647
        // this HTLC.
648
        attemptID, err := p.router.cfg.NextPaymentID()
40✔
649
        if err != nil {
40✔
650
                return nil, err
×
651
        }
×
652

653
        // Request a new shard from the ShardTracker. If this is an AMP
654
        // payment, and this is the last shard, the outstanding shards together
655
        // with this one will be enough for the receiver to derive all HTLC
656
        // preimages. If this a non-AMP payment, the ShardTracker will return a
657
        // simple shard with the payment's static payment hash.
658
        shard, err := p.shardTracker.NewShard(attemptID, lastShard)
40✔
659
        if err != nil {
41✔
660
                return nil, err
1✔
661
        }
1✔
662

663
        // If this shard carries MPP or AMP options, add them to the last hop
664
        // on the route.
665
        hop := rt.Hops[len(rt.Hops)-1]
39✔
666
        if shard.MPP() != nil {
47✔
667
                hop.MPP = shard.MPP()
8✔
668
        }
8✔
669

670
        if shard.AMP() != nil {
43✔
671
                hop.AMP = shard.AMP()
4✔
672
        }
4✔
673

674
        hash := shard.Hash()
39✔
675

39✔
676
        // We now have all the information needed to populate the current
39✔
677
        // attempt information.
39✔
678
        attempt := channeldb.NewHtlcAttempt(
39✔
679
                attemptID, sessionKey, *rt, p.router.cfg.Clock.Now(), &hash,
39✔
680
        )
39✔
681

39✔
682
        return attempt, nil
39✔
683
}
684

685
// sendAttempt attempts to send the current attempt to the switch to complete
686
// the payment. If this attempt fails, then we'll continue on to the next
687
// available route.
688
func (p *paymentLifecycle) sendAttempt(
689
        attempt *channeldb.HTLCAttempt) (*attemptResult, error) {
39✔
690

39✔
691
        log.Debugf("Sending HTLC attempt(id=%v, total_amt=%v, first_hop_amt=%d"+
39✔
692
                ") for payment %v", attempt.AttemptID,
39✔
693
                attempt.Route.TotalAmount, attempt.Route.FirstHopAmount.Val,
39✔
694
                p.identifier)
39✔
695

39✔
696
        rt := attempt.Route
39✔
697

39✔
698
        // Construct the first hop.
39✔
699
        firstHop := lnwire.NewShortChanIDFromInt(rt.Hops[0].ChannelID)
39✔
700

39✔
701
        // Craft an HTLC packet to send to the htlcswitch. The metadata within
39✔
702
        // this packet will be used to route the payment through the network,
39✔
703
        // starting with the first-hop.
39✔
704
        htlcAdd := &lnwire.UpdateAddHTLC{
39✔
705
                Amount:        rt.FirstHopAmount.Val.Int(),
39✔
706
                Expiry:        rt.TotalTimeLock,
39✔
707
                PaymentHash:   *attempt.Hash,
39✔
708
                CustomRecords: rt.FirstHopWireCustomRecords,
39✔
709
        }
39✔
710

39✔
711
        // Generate the raw encoded sphinx packet to be included along
39✔
712
        // with the htlcAdd message that we send directly to the
39✔
713
        // switch.
39✔
714
        onionBlob, _, err := generateSphinxPacket(
39✔
715
                &rt, attempt.Hash[:], attempt.SessionKey(),
39✔
716
        )
39✔
717
        if err != nil {
40✔
718
                log.Errorf("Failed to create onion blob: attempt=%d in "+
1✔
719
                        "payment=%v, err:%v", attempt.AttemptID,
1✔
720
                        p.identifier, err)
1✔
721

1✔
722
                return p.failAttempt(attempt.AttemptID, err)
1✔
723
        }
1✔
724

725
        copy(htlcAdd.OnionBlob[:], onionBlob)
38✔
726

38✔
727
        // Send it to the Switch. When this method returns we assume
38✔
728
        // the Switch successfully has persisted the payment attempt,
38✔
729
        // such that we can resume waiting for the result after a
38✔
730
        // restart.
38✔
731
        err = p.router.cfg.Payer.SendHTLC(firstHop, attempt.AttemptID, htlcAdd)
38✔
732
        if err != nil {
47✔
733
                log.Errorf("Failed sending attempt %d for payment %v to "+
9✔
734
                        "switch: %v", attempt.AttemptID, p.identifier, err)
9✔
735

9✔
736
                return p.handleSwitchErr(attempt, err)
9✔
737
        }
9✔
738

739
        log.Debugf("Attempt %v for payment %v successfully sent to switch, "+
33✔
740
                "route: %v", attempt.AttemptID, p.identifier, &attempt.Route)
33✔
741

33✔
742
        return &attemptResult{
33✔
743
                attempt: attempt,
33✔
744
        }, nil
33✔
745
}
746

747
// amendFirstHopData is a function that calls the traffic shaper to allow it to
748
// add custom records to the outgoing HTLC and also adjust the amount if
749
// needed.
750
func (p *paymentLifecycle) amendFirstHopData(rt *route.Route) error {
40✔
751
        // The first hop amount on the route is the full route amount if not
40✔
752
        // overwritten by the traffic shaper. So we set the initial value now
40✔
753
        // and potentially overwrite it later.
40✔
754
        rt.FirstHopAmount = tlv.NewRecordT[tlv.TlvType0](
40✔
755
                tlv.NewBigSizeT(rt.TotalAmount),
40✔
756
        )
40✔
757

40✔
758
        // By default, we set the first hop custom records to the initial
40✔
759
        // value requested by the RPC. The traffic shaper may overwrite this
40✔
760
        // value.
40✔
761
        rt.FirstHopWireCustomRecords = p.firstHopCustomRecords
40✔
762

40✔
763
        // extraDataRequest is a helper struct to pass the custom records and
40✔
764
        // amount back from the traffic shaper.
40✔
765
        type extraDataRequest struct {
40✔
766
                customRecords fn.Option[lnwire.CustomRecords]
40✔
767

40✔
768
                amount fn.Option[lnwire.MilliSatoshi]
40✔
769
        }
40✔
770

40✔
771
        // If a hook exists that may affect our outgoing message, we call it now
40✔
772
        // and apply its side effects to the UpdateAddHTLC message.
40✔
773
        result, err := fn.MapOptionZ(
40✔
774
                p.router.cfg.TrafficShaper,
40✔
775
                //nolint:ll
40✔
776
                func(ts htlcswitch.AuxTrafficShaper) fn.Result[extraDataRequest] {
76✔
777
                        newAmt, newRecords, err := ts.ProduceHtlcExtraData(
36✔
778
                                rt.TotalAmount, p.firstHopCustomRecords,
36✔
779
                        )
36✔
780
                        if err != nil {
36✔
781
                                return fn.Err[extraDataRequest](err)
×
782
                        }
×
783

784
                        // Make sure we only received valid records.
785
                        if err := newRecords.Validate(); err != nil {
36✔
786
                                return fn.Err[extraDataRequest](err)
×
787
                        }
×
788

789
                        log.Debugf("Aux traffic shaper returned custom "+
36✔
790
                                "records %v and amount %d msat for HTLC",
36✔
791
                                spew.Sdump(newRecords), newAmt)
36✔
792

36✔
793
                        return fn.Ok(extraDataRequest{
36✔
794
                                customRecords: fn.Some(newRecords),
36✔
795
                                amount:        fn.Some(newAmt),
36✔
796
                        })
36✔
797
                },
798
        ).Unpack()
799
        if err != nil {
40✔
800
                return fmt.Errorf("traffic shaper failed to produce extra "+
×
801
                        "data: %w", err)
×
802
        }
×
803

804
        // Apply the side effects to the UpdateAddHTLC message.
805
        result.customRecords.WhenSome(func(records lnwire.CustomRecords) {
76✔
806
                rt.FirstHopWireCustomRecords = records
36✔
807
        })
36✔
808
        result.amount.WhenSome(func(amount lnwire.MilliSatoshi) {
76✔
809
                rt.FirstHopAmount = tlv.NewRecordT[tlv.TlvType0](
36✔
810
                        tlv.NewBigSizeT(amount),
36✔
811
                )
36✔
812
        })
36✔
813

814
        return nil
40✔
815
}
816

817
// failAttemptAndPayment fails both the payment and its attempt via the
818
// router's control tower, which marks the payment as failed in db.
819
func (p *paymentLifecycle) failPaymentAndAttempt(
820
        attemptID uint64, reason *channeldb.FailureReason,
821
        sendErr error) (*attemptResult, error) {
9✔
822

9✔
823
        log.Errorf("Payment %v failed: final_outcome=%v, raw_err=%v",
9✔
824
                p.identifier, *reason, sendErr)
9✔
825

9✔
826
        // Fail the payment via control tower.
9✔
827
        //
9✔
828
        // NOTE: we must fail the payment first before failing the attempt.
9✔
829
        // Otherwise, once the attempt is marked as failed, another goroutine
9✔
830
        // might make another attempt while we are failing the payment.
9✔
831
        err := p.router.cfg.Control.FailPayment(p.identifier, *reason)
9✔
832
        if err != nil {
9✔
833
                log.Errorf("Unable to fail payment: %v", err)
×
834
                return nil, err
×
835
        }
×
836

837
        // Fail the attempt.
838
        return p.failAttempt(attemptID, sendErr)
9✔
839
}
840

841
// handleSwitchErr inspects the given error from the Switch and determines
842
// whether we should make another payment attempt, or if it should be
843
// considered a terminal error. Terminal errors will be recorded with the
844
// control tower. It analyzes the sendErr for the payment attempt received from
845
// the switch and updates mission control and/or channel policies. Depending on
846
// the error type, the error is either the final outcome of the payment or we
847
// need to continue with an alternative route. A final outcome is indicated by
848
// a non-nil reason value.
849
func (p *paymentLifecycle) handleSwitchErr(attempt *channeldb.HTLCAttempt,
850
        sendErr error) (*attemptResult, error) {
26✔
851

26✔
852
        internalErrorReason := channeldb.FailureReasonError
26✔
853
        attemptID := attempt.AttemptID
26✔
854

26✔
855
        // reportAndFail is a helper closure that reports the failure to the
26✔
856
        // mission control, which helps us to decide whether we want to retry
26✔
857
        // the payment or not. If a non nil reason is returned from mission
26✔
858
        // control, it will further fail the payment via control tower.
26✔
859
        reportAndFail := func(srcIdx *int,
26✔
860
                msg lnwire.FailureMessage) (*attemptResult, error) {
49✔
861

23✔
862
                // Report outcome to mission control.
23✔
863
                reason, err := p.router.cfg.MissionControl.ReportPaymentFail(
23✔
864
                        attemptID, &attempt.Route, srcIdx, msg,
23✔
865
                )
23✔
866
                if err != nil {
23✔
867
                        log.Errorf("Error reporting payment result to mc: %v",
×
868
                                err)
×
869

×
870
                        reason = &internalErrorReason
×
871
                }
×
872

873
                // Fail the attempt only if there's no reason.
874
                if reason == nil {
44✔
875
                        // Fail the attempt.
21✔
876
                        return p.failAttempt(attemptID, sendErr)
21✔
877
                }
21✔
878

879
                // Otherwise fail both the payment and the attempt.
880
                return p.failPaymentAndAttempt(attemptID, reason, sendErr)
6✔
881
        }
882

883
        // If this attempt ID is unknown to the Switch, it means it was never
884
        // checkpointed and forwarded by the switch before a restart. In this
885
        // case we can safely send a new payment attempt, and wait for its
886
        // result to be available.
887
        if errors.Is(sendErr, htlcswitch.ErrPaymentIDNotFound) {
26✔
888
                log.Debugf("Attempt ID %v for payment %v not found in the "+
×
889
                        "Switch, retrying.", attempt.AttemptID, p.identifier)
×
890

×
891
                return p.failAttempt(attemptID, sendErr)
×
892
        }
×
893

894
        if errors.Is(sendErr, htlcswitch.ErrUnreadableFailureMessage) {
27✔
895
                log.Warn("Unreadable failure when sending htlc: id=%v, hash=%v",
1✔
896
                        attempt.AttemptID, attempt.Hash)
1✔
897

1✔
898
                // Since this error message cannot be decrypted, we will send a
1✔
899
                // nil error message to our mission controller and fail the
1✔
900
                // payment.
1✔
901
                return reportAndFail(nil, nil)
1✔
902
        }
1✔
903

904
        // If the error is a ClearTextError, we have received a valid wire
905
        // failure message, either from our own outgoing link or from a node
906
        // down the route. If the error is not related to the propagation of
907
        // our payment, we can stop trying because an internal error has
908
        // occurred.
909
        var rtErr htlcswitch.ClearTextError
25✔
910
        ok := errors.As(sendErr, &rtErr)
25✔
911
        if !ok {
28✔
912
                return p.failPaymentAndAttempt(
3✔
913
                        attemptID, &internalErrorReason, sendErr,
3✔
914
                )
3✔
915
        }
3✔
916

917
        // failureSourceIdx is the index of the node that the failure occurred
918
        // at. If the ClearTextError received is not a ForwardingError the
919
        // payment error occurred at our node, so we leave this value as 0
920
        // to indicate that the failure occurred locally. If the error is a
921
        // ForwardingError, it did not originate at our node, so we set
922
        // failureSourceIdx to the index of the node where the failure occurred.
923
        failureSourceIdx := 0
22✔
924
        var source *htlcswitch.ForwardingError
22✔
925
        ok = errors.As(rtErr, &source)
22✔
926
        if ok {
44✔
927
                failureSourceIdx = source.FailureSourceIdx
22✔
928
        }
22✔
929

930
        // Extract the wire failure and apply channel update if it contains one.
931
        // If we received an unknown failure message from a node along the
932
        // route, the failure message will be nil.
933
        failureMessage := rtErr.WireMessage()
22✔
934
        err := p.handleFailureMessage(
22✔
935
                &attempt.Route, failureSourceIdx, failureMessage,
22✔
936
        )
22✔
937
        if err != nil {
22✔
938
                return p.failPaymentAndAttempt(
×
939
                        attemptID, &internalErrorReason, sendErr,
×
940
                )
×
941
        }
×
942

943
        log.Tracef("Node=%v reported failure when sending htlc",
22✔
944
                failureSourceIdx)
22✔
945

22✔
946
        return reportAndFail(&failureSourceIdx, failureMessage)
22✔
947
}
948

949
// handleFailureMessage tries to apply a channel update present in the failure
950
// message if any.
951
func (p *paymentLifecycle) handleFailureMessage(rt *route.Route,
952
        errorSourceIdx int, failure lnwire.FailureMessage) error {
22✔
953

22✔
954
        if failure == nil {
23✔
955
                return nil
1✔
956
        }
1✔
957

958
        // It makes no sense to apply our own channel updates.
959
        if errorSourceIdx == 0 {
25✔
960
                log.Errorf("Channel update of ourselves received")
4✔
961

4✔
962
                return nil
4✔
963
        }
4✔
964

965
        // Extract channel update if the error contains one.
966
        update := p.router.extractChannelUpdate(failure)
21✔
967
        if update == nil {
34✔
968
                return nil
13✔
969
        }
13✔
970

971
        // Parse pubkey to allow validation of the channel update. This should
972
        // always succeed, otherwise there is something wrong in our
973
        // implementation. Therefore, return an error.
974
        errVertex := rt.Hops[errorSourceIdx-1].PubKeyBytes
12✔
975
        errSource, err := btcec.ParsePubKey(errVertex[:])
12✔
976
        if err != nil {
12✔
977
                log.Errorf("Cannot parse pubkey: idx=%v, pubkey=%v",
×
978
                        errorSourceIdx, errVertex)
×
979

×
980
                return err
×
981
        }
×
982

983
        var (
12✔
984
                isAdditionalEdge bool
12✔
985
                policy           *models.CachedEdgePolicy
12✔
986
        )
12✔
987

12✔
988
        // Before we apply the channel update, we need to decide whether the
12✔
989
        // update is for additional (ephemeral) edge or normal edge stored in
12✔
990
        // db.
12✔
991
        //
12✔
992
        // Note: the p.paySession might be nil here if it's called inside
12✔
993
        // SendToRoute where there's no payment lifecycle.
12✔
994
        if p.paySession != nil {
21✔
995
                policy = p.paySession.GetAdditionalEdgePolicy(
9✔
996
                        errSource, update.ShortChannelID.ToUint64(),
9✔
997
                )
9✔
998
                if policy != nil {
15✔
999
                        isAdditionalEdge = true
6✔
1000
                }
6✔
1001
        }
1002

1003
        // Apply channel update to additional edge policy.
1004
        if isAdditionalEdge {
18✔
1005
                if !p.paySession.UpdateAdditionalEdge(
6✔
1006
                        update, errSource, policy) {
6✔
1007

×
1008
                        log.Debugf("Invalid channel update received: node=%v",
×
1009
                                errVertex)
×
1010
                }
×
1011
                return nil
6✔
1012
        }
1013

1014
        // Apply channel update to the channel edge policy in our db.
1015
        if !p.router.cfg.ApplyChannelUpdate(update) {
16✔
1016
                log.Debugf("Invalid channel update received: node=%v",
6✔
1017
                        errVertex)
6✔
1018
        }
6✔
1019
        return nil
10✔
1020
}
1021

1022
// failAttempt calls control tower to fail the current payment attempt.
1023
func (p *paymentLifecycle) failAttempt(attemptID uint64,
1024
        sendError error) (*attemptResult, error) {
27✔
1025

27✔
1026
        log.Warnf("Attempt %v for payment %v failed: %v", attemptID,
27✔
1027
                p.identifier, sendError)
27✔
1028

27✔
1029
        failInfo := marshallError(
27✔
1030
                sendError,
27✔
1031
                p.router.cfg.Clock.Now(),
27✔
1032
        )
27✔
1033

27✔
1034
        // Now that we are failing this payment attempt, cancel the shard with
27✔
1035
        // the ShardTracker such that it can derive the correct hash for the
27✔
1036
        // next attempt.
27✔
1037
        if err := p.shardTracker.CancelShard(attemptID); err != nil {
27✔
1038
                return nil, err
×
1039
        }
×
1040

1041
        attempt, err := p.router.cfg.Control.FailAttempt(
27✔
1042
                p.identifier, attemptID, failInfo,
27✔
1043
        )
27✔
1044
        if err != nil {
30✔
1045
                return nil, err
3✔
1046
        }
3✔
1047

1048
        return &attemptResult{
24✔
1049
                attempt: attempt,
24✔
1050
                err:     sendError,
24✔
1051
        }, nil
24✔
1052
}
1053

1054
// marshallError marshall an error as received from the switch to a structure
1055
// that is suitable for database storage.
1056
func marshallError(sendError error, time time.Time) *channeldb.HTLCFailInfo {
27✔
1057
        response := &channeldb.HTLCFailInfo{
27✔
1058
                FailTime: time,
27✔
1059
        }
27✔
1060

27✔
1061
        switch {
27✔
1062
        case errors.Is(sendError, htlcswitch.ErrPaymentIDNotFound):
×
1063
                response.Reason = channeldb.HTLCFailInternal
×
1064
                return response
×
1065

1066
        case errors.Is(sendError, htlcswitch.ErrUnreadableFailureMessage):
1✔
1067
                response.Reason = channeldb.HTLCFailUnreadable
1✔
1068
                return response
1✔
1069
        }
1070

1071
        var rtErr htlcswitch.ClearTextError
26✔
1072
        ok := errors.As(sendError, &rtErr)
26✔
1073
        if !ok {
30✔
1074
                response.Reason = channeldb.HTLCFailInternal
4✔
1075
                return response
4✔
1076
        }
4✔
1077

1078
        message := rtErr.WireMessage()
22✔
1079
        if message != nil {
43✔
1080
                response.Reason = channeldb.HTLCFailMessage
21✔
1081
                response.Message = message
21✔
1082
        } else {
22✔
1083
                response.Reason = channeldb.HTLCFailUnknown
1✔
1084
        }
1✔
1085

1086
        // If the ClearTextError received is a ForwardingError, the error
1087
        // originated from a node along the route, not locally on our outgoing
1088
        // link. We set failureSourceIdx to the index of the node where the
1089
        // failure occurred. If the error is not a ForwardingError, the failure
1090
        // occurred at our node, so we leave the index as 0 to indicate that
1091
        // we failed locally.
1092
        var fErr *htlcswitch.ForwardingError
22✔
1093
        ok = errors.As(rtErr, &fErr)
22✔
1094
        if ok {
44✔
1095
                response.FailureSourceIndex = uint32(fErr.FailureSourceIdx)
22✔
1096
        }
22✔
1097

1098
        return response
22✔
1099
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc