• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.65
/htlcswitch/interceptable_switch.go
1
package htlcswitch
2

3
import (
4
        "crypto/sha256"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/go-errors/errors"
10
        "github.com/lightningnetwork/lnd/chainntnfs"
11
        "github.com/lightningnetwork/lnd/fn/v2"
12
        "github.com/lightningnetwork/lnd/graph/db/models"
13
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
14
        "github.com/lightningnetwork/lnd/lntypes"
15
        "github.com/lightningnetwork/lnd/lnutils"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
var (
20
        // ErrFwdNotExists is an error returned when the caller tries to resolve
21
        // a forward that doesn't exist anymore.
22
        ErrFwdNotExists = errors.New("forward does not exist")
23

24
        // ErrUnsupportedFailureCode when processing of an unsupported failure
25
        // code is attempted.
26
        ErrUnsupportedFailureCode = errors.New("unsupported failure code")
27

28
        errBlockStreamStopped = errors.New("block epoch stream stopped")
29
)
30

31
// InterceptableSwitch is an implementation of ForwardingSwitch interface.
32
// This implementation is used like a proxy that wraps the switch and
33
// intercepts forward requests. A reference to the Switch is held in order
34
// to communicate back the interception result where the options are:
35
// Resume - forwards the original request to the switch as is.
36
// Settle - routes UpdateFulfillHTLC to the originating link.
37
// Fail - routes UpdateFailHTLC to the originating link.
38
type InterceptableSwitch struct {
39
        started atomic.Bool
40
        stopped atomic.Bool
41

42
        // htlcSwitch is the underline switch
43
        htlcSwitch *Switch
44

45
        // intercepted is where we stream all intercepted packets coming from
46
        // the switch.
47
        intercepted chan *interceptedPackets
48

49
        // resolutionChan is where we stream all responses coming from the
50
        // interceptor client.
51
        resolutionChan chan *fwdResolution
52

53
        onchainIntercepted chan InterceptedForward
54

55
        // interceptorRegistration is a channel that we use to synchronize
56
        // client connect and disconnect.
57
        interceptorRegistration chan ForwardInterceptor
58

59
        // requireInterceptor indicates whether processing should block if no
60
        // interceptor is connected.
61
        requireInterceptor bool
62

63
        // interceptor is the handler for intercepted packets.
64
        interceptor ForwardInterceptor
65

66
        // heldHtlcSet keeps track of outstanding intercepted forwards.
67
        heldHtlcSet *heldHtlcSet
68

69
        // cltvRejectDelta defines the number of blocks before the expiry of the
70
        // htlc where we no longer intercept it and instead cancel it back.
71
        cltvRejectDelta uint32
72

73
        // cltvInterceptDelta defines the number of blocks before the expiry of
74
        // the htlc where we don't intercept anymore. This value must be greater
75
        // than CltvRejectDelta, because we don't want to offer htlcs to the
76
        // interceptor client for which there is no time left to resolve them
77
        // anymore.
78
        cltvInterceptDelta uint32
79

80
        // notifier is an instance of a chain notifier that we'll use to signal
81
        // the switch when a new block has arrived.
82
        notifier chainntnfs.ChainNotifier
83

84
        // blockEpochStream is an active block epoch event stream backed by an
85
        // active ChainNotifier instance. This will be used to retrieve the
86
        // latest height of the chain.
87
        blockEpochStream *chainntnfs.BlockEpochEvent
88

89
        // currentHeight is the currently best known height.
90
        currentHeight int32
91

92
        wg   sync.WaitGroup
93
        quit chan struct{}
94
}
95

96
type interceptedPackets struct {
97
        packets  []*htlcPacket
98
        linkQuit <-chan struct{}
99
        isReplay bool
100
}
101

102
// FwdAction defines the various resolution types.
103
type FwdAction int
104

105
const (
106
        // FwdActionResume forwards the intercepted packet to the switch.
107
        FwdActionResume FwdAction = iota
108

109
        // FwdActionSettle settles the intercepted packet with a preimage.
110
        FwdActionSettle
111

112
        // FwdActionFail fails the intercepted packet back to the sender.
113
        FwdActionFail
114

115
        // FwdActionResumeModified forwards the intercepted packet to the switch
116
        // with modifications.
117
        FwdActionResumeModified
118
)
119

120
// FwdResolution defines the action to be taken on an intercepted packet.
121
type FwdResolution struct {
122
        // Key is the incoming circuit key of the htlc.
123
        Key models.CircuitKey
124

125
        // Action is the action to take on the intercepted htlc.
126
        Action FwdAction
127

128
        // Preimage is the preimage that is to be used for settling if Action is
129
        // FwdActionSettle.
130
        Preimage lntypes.Preimage
131

132
        // InAmountMsat is the amount that is to be used for validating if
133
        // Action is FwdActionResumeModified.
134
        InAmountMsat fn.Option[lnwire.MilliSatoshi]
135

136
        // OutAmountMsat is the amount that is to be used for forwarding if
137
        // Action is FwdActionResumeModified.
138
        OutAmountMsat fn.Option[lnwire.MilliSatoshi]
139

140
        // OutWireCustomRecords is the custom records that are to be used for
141
        // forwarding if Action is FwdActionResumeModified.
142
        OutWireCustomRecords fn.Option[lnwire.CustomRecords]
143

144
        // FailureMessage is the encrypted failure message that is to be passed
145
        // back to the sender if action is FwdActionFail.
146
        FailureMessage []byte
147

148
        // FailureCode is the failure code that is to be passed back to the
149
        // sender if action is FwdActionFail.
150
        FailureCode lnwire.FailCode
151
}
152

153
type fwdResolution struct {
154
        resolution *FwdResolution
155
        errChan    chan error
156
}
157

158
// InterceptableSwitchConfig contains the configuration of InterceptableSwitch.
159
type InterceptableSwitchConfig struct {
160
        // Switch is a reference to the actual switch implementation that
161
        // packets get sent to on resume.
162
        Switch *Switch
163

164
        // Notifier is an instance of a chain notifier that we'll use to signal
165
        // the switch when a new block has arrived.
166
        Notifier chainntnfs.ChainNotifier
167

168
        // CltvRejectDelta defines the number of blocks before the expiry of the
169
        // htlc where we auto-fail an intercepted htlc to prevent channel
170
        // force-closure.
171
        CltvRejectDelta uint32
172

173
        // CltvInterceptDelta defines the number of blocks before the expiry of
174
        // the htlc where we don't intercept anymore. This value must be greater
175
        // than CltvRejectDelta, because we don't want to offer htlcs to the
176
        // interceptor client for which there is no time left to resolve them
177
        // anymore.
178
        CltvInterceptDelta uint32
179

180
        // RequireInterceptor indicates whether processing should block if no
181
        // interceptor is connected.
182
        RequireInterceptor bool
183
}
184

185
// NewInterceptableSwitch returns an instance of InterceptableSwitch.
186
func NewInterceptableSwitch(cfg *InterceptableSwitchConfig) (
187
        *InterceptableSwitch, error) {
24✔
188

24✔
189
        if cfg.CltvInterceptDelta <= cfg.CltvRejectDelta {
24✔
190
                return nil, fmt.Errorf("cltv intercept delta %v not greater "+
×
191
                        "than cltv reject delta %v",
×
192
                        cfg.CltvInterceptDelta, cfg.CltvRejectDelta)
×
193
        }
×
194

195
        return &InterceptableSwitch{
24✔
196
                htlcSwitch:              cfg.Switch,
24✔
197
                intercepted:             make(chan *interceptedPackets),
24✔
198
                onchainIntercepted:      make(chan InterceptedForward),
24✔
199
                interceptorRegistration: make(chan ForwardInterceptor),
24✔
200
                heldHtlcSet:             newHeldHtlcSet(),
24✔
201
                resolutionChan:          make(chan *fwdResolution),
24✔
202
                requireInterceptor:      cfg.RequireInterceptor,
24✔
203
                cltvRejectDelta:         cfg.CltvRejectDelta,
24✔
204
                cltvInterceptDelta:      cfg.CltvInterceptDelta,
24✔
205
                notifier:                cfg.Notifier,
24✔
206

24✔
207
                quit: make(chan struct{}),
24✔
208
        }, nil
24✔
209
}
210

211
// SetInterceptor sets the ForwardInterceptor to be used. A nil argument
212
// unregisters the current interceptor.
213
func (s *InterceptableSwitch) SetInterceptor(
214
        interceptor ForwardInterceptor) {
9✔
215

9✔
216
        // Synchronize setting the handler with the main loop to prevent race
9✔
217
        // conditions.
9✔
218
        select {
9✔
219
        case s.interceptorRegistration <- interceptor:
9✔
220

221
        case <-s.quit:
×
222
        }
223
}
224

225
func (s *InterceptableSwitch) Start() error {
5✔
226
        log.Info("InterceptableSwitch starting...")
5✔
227

5✔
228
        if s.started.Swap(true) {
5✔
229
                return fmt.Errorf("InterceptableSwitch started more than once")
×
230
        }
×
231

232
        blockEpochStream, err := s.notifier.RegisterBlockEpochNtfn(nil)
5✔
233
        if err != nil {
5✔
234
                return err
×
235
        }
×
236
        s.blockEpochStream = blockEpochStream
5✔
237

5✔
238
        s.wg.Add(1)
5✔
239
        go func() {
10✔
240
                defer s.wg.Done()
5✔
241

5✔
242
                err := s.run()
5✔
243
                if err != nil {
5✔
244
                        log.Errorf("InterceptableSwitch stopped: %v", err)
×
245
                }
×
246
        }()
247

248
        log.Debug("InterceptableSwitch started")
5✔
249

5✔
250
        return nil
5✔
251
}
252

253
func (s *InterceptableSwitch) Stop() error {
4✔
254
        log.Info("InterceptableSwitch shutting down...")
4✔
255

4✔
256
        if s.stopped.Swap(true) {
4✔
257
                return fmt.Errorf("InterceptableSwitch stopped more than once")
×
258
        }
×
259

260
        close(s.quit)
4✔
261
        s.wg.Wait()
4✔
262

4✔
263
        // We need to check whether the start routine run and initialized the
4✔
264
        // `blockEpochStream`.
4✔
265
        if s.blockEpochStream != nil {
8✔
266
                s.blockEpochStream.Cancel()
4✔
267
        }
4✔
268

269
        log.Debug("InterceptableSwitch shutdown complete")
4✔
270

4✔
271
        return nil
4✔
272
}
273

274
func (s *InterceptableSwitch) run() error {
5✔
275
        // The block epoch stream will immediately stream the current height.
5✔
276
        // Read it out here.
5✔
277
        select {
5✔
278
        case currentBlock, ok := <-s.blockEpochStream.Epochs:
5✔
279
                if !ok {
5✔
280
                        return errBlockStreamStopped
×
281
                }
×
282
                s.currentHeight = currentBlock.Height
5✔
283

284
        case <-s.quit:
×
285
                return nil
×
286
        }
287

288
        log.Debugf("InterceptableSwitch running: height=%v, "+
5✔
289
                "requireInterceptor=%v", s.currentHeight, s.requireInterceptor)
5✔
290

5✔
291
        for {
43✔
292
                select {
38✔
293
                // An interceptor registration or de-registration came in.
294
                case interceptor := <-s.interceptorRegistration:
9✔
295
                        s.setInterceptor(interceptor)
9✔
296

297
                case packets := <-s.intercepted:
16✔
298
                        var notIntercepted []*htlcPacket
16✔
299
                        for _, p := range packets.packets {
32✔
300
                                intercepted, err := s.interceptForward(
16✔
301
                                        p, packets.isReplay,
16✔
302
                                )
16✔
303
                                if err != nil {
16✔
304
                                        return err
×
305
                                }
×
306

307
                                if !intercepted {
20✔
308
                                        notIntercepted = append(
4✔
309
                                                notIntercepted, p,
4✔
310
                                        )
4✔
311
                                }
4✔
312
                        }
313
                        err := s.htlcSwitch.ForwardPackets(
16✔
314
                                packets.linkQuit, notIntercepted...,
16✔
315
                        )
16✔
316
                        if err != nil {
16✔
317
                                log.Errorf("Cannot forward packets: %v", err)
×
318
                        }
×
319

320
                case fwd := <-s.onchainIntercepted:
×
321
                        // For on-chain interceptions, we don't know if it has
×
322
                        // already been offered before. This information is in
×
323
                        // the forwarding package which isn't easily accessible
×
324
                        // from contractcourt. It is likely though that it was
×
325
                        // already intercepted in the off-chain flow. And even
×
326
                        // if not, it is safe to signal replay so that we won't
×
327
                        // unexpectedly skip over this htlc.
×
328
                        if _, err := s.forward(fwd, true); err != nil {
×
329
                                return err
×
330
                        }
×
331

332
                case res := <-s.resolutionChan:
7✔
333
                        res.errChan <- s.resolve(res.resolution)
7✔
334

335
                case currentBlock, ok := <-s.blockEpochStream.Epochs:
1✔
336
                        if !ok {
1✔
337
                                return errBlockStreamStopped
×
338
                        }
×
339

340
                        s.currentHeight = currentBlock.Height
1✔
341

1✔
342
                        // A new block is appended. Fail any held htlcs that
1✔
343
                        // expire at this height to prevent channel force-close.
1✔
344
                        s.failExpiredHtlcs()
1✔
345

346
                case <-s.quit:
4✔
347
                        return nil
4✔
348
                }
349
        }
350
}
351

352
func (s *InterceptableSwitch) failExpiredHtlcs() {
1✔
353
        s.heldHtlcSet.popAutoFails(
1✔
354
                uint32(s.currentHeight),
1✔
355
                func(fwd InterceptedForward) {
2✔
356
                        err := fwd.FailWithCode(
1✔
357
                                lnwire.CodeTemporaryChannelFailure,
1✔
358
                        )
1✔
359
                        if err != nil {
1✔
360
                                log.Errorf("Cannot fail packet: %v", err)
×
361
                        }
×
362
                },
363
        )
364
}
365

366
func (s *InterceptableSwitch) sendForward(fwd InterceptedForward) {
11✔
367
        err := s.interceptor(fwd.Packet())
11✔
368
        if err != nil {
11✔
369
                // Only log the error. If we couldn't send the packet, we assume
×
370
                // that the interceptor will reconnect so that we can retry.
×
371
                log.Debugf("Interceptor cannot handle forward: %v", err)
×
372
        }
×
373
}
374

375
func (s *InterceptableSwitch) setInterceptor(interceptor ForwardInterceptor) {
9✔
376
        s.interceptor = interceptor
9✔
377

9✔
378
        // Replay all currently held htlcs. When an interceptor is not required,
9✔
379
        // there may be none because they've been cleared after the previous
9✔
380
        // disconnect.
9✔
381
        if interceptor != nil {
16✔
382
                log.Debugf("Interceptor connected")
7✔
383

7✔
384
                s.heldHtlcSet.forEach(s.sendForward)
7✔
385

7✔
386
                return
7✔
387
        }
7✔
388

389
        // The interceptor disconnects. If an interceptor is required, keep the
390
        // held htlcs.
391
        if s.requireInterceptor {
3✔
392
                log.Infof("Interceptor disconnected, retaining held packets")
1✔
393

1✔
394
                return
1✔
395
        }
1✔
396

397
        // Interceptor is not required. Release held forwards.
398
        log.Infof("Interceptor disconnected, resolving held packets")
1✔
399

1✔
400
        s.heldHtlcSet.popAll(func(fwd InterceptedForward) {
2✔
401
                err := fwd.Resume()
1✔
402
                if err != nil {
1✔
403
                        log.Errorf("Failed to resume hold forward %v", err)
×
404
                }
×
405
        })
406
}
407

408
// resolve processes a HTLC given the resolution type specified by the
409
// intercepting client.
410
func (s *InterceptableSwitch) resolve(res *FwdResolution) error {
9✔
411
        intercepted, err := s.heldHtlcSet.pop(res.Key)
9✔
412
        if err != nil {
10✔
413
                return err
1✔
414
        }
1✔
415

416
        switch res.Action {
8✔
417
        case FwdActionResume:
1✔
418
                return intercepted.Resume()
1✔
419

420
        case FwdActionResumeModified:
×
421
                return intercepted.ResumeModified(
×
422
                        res.InAmountMsat, res.OutAmountMsat,
×
423
                        res.OutWireCustomRecords,
×
424
                )
×
425

426
        case FwdActionSettle:
2✔
427
                return intercepted.Settle(res.Preimage)
2✔
428

429
        case FwdActionFail:
5✔
430
                if len(res.FailureMessage) > 0 {
6✔
431
                        return intercepted.Fail(res.FailureMessage)
1✔
432
                }
1✔
433

434
                return intercepted.FailWithCode(res.FailureCode)
4✔
435

436
        default:
×
437
                return fmt.Errorf("unrecognized action %v", res.Action)
×
438
        }
439
}
440

441
// Resolve resolves an intercepted packet.
442
func (s *InterceptableSwitch) Resolve(res *FwdResolution) error {
7✔
443
        internalRes := &fwdResolution{
7✔
444
                resolution: res,
7✔
445
                errChan:    make(chan error, 1),
7✔
446
        }
7✔
447

7✔
448
        select {
7✔
449
        case s.resolutionChan <- internalRes:
7✔
450

451
        case <-s.quit:
×
452
                return errors.New("switch shutting down")
×
453
        }
454

455
        select {
7✔
456
        case err := <-internalRes.errChan:
7✔
457
                return err
7✔
458

459
        case <-s.quit:
×
460
                return errors.New("switch shutting down")
×
461
        }
462
}
463

464
// ForwardPackets attempts to forward the batch of htlcs to a connected
465
// interceptor. If the interceptor signals the resume action, the htlcs are
466
// forwarded to the switch. The link's quit signal should be provided to allow
467
// cancellation of forwarding during link shutdown.
468
func (s *InterceptableSwitch) ForwardPackets(linkQuit <-chan struct{},
469
        isReplay bool, packets ...*htlcPacket) error {
16✔
470

16✔
471
        // Synchronize with the main event loop. This should be light in the
16✔
472
        // case where there is no interceptor.
16✔
473
        select {
16✔
474
        case s.intercepted <- &interceptedPackets{
475
                packets:  packets,
476
                linkQuit: linkQuit,
477
                isReplay: isReplay,
478
        }:
16✔
479

480
        case <-linkQuit:
×
481
                log.Debugf("Forward cancelled because link quit")
×
482

483
        case <-s.quit:
×
484
                return errors.New("interceptable switch quit")
×
485
        }
486

487
        return nil
16✔
488
}
489

490
// ForwardPacket forwards a single htlc to the external interceptor.
491
func (s *InterceptableSwitch) ForwardPacket(
492
        fwd InterceptedForward) error {
×
493

×
494
        select {
×
495
        case s.onchainIntercepted <- fwd:
×
496

497
        case <-s.quit:
×
498
                return errors.New("interceptable switch quit")
×
499
        }
500

501
        return nil
×
502
}
503

504
// interceptForward forwards the packet to the external interceptor after
505
// checking the interception criteria.
506
func (s *InterceptableSwitch) interceptForward(packet *htlcPacket,
507
        isReplay bool) (bool, error) {
16✔
508

16✔
509
        switch htlc := packet.htlc.(type) {
16✔
510
        case *lnwire.UpdateAddHTLC:
13✔
511
                // We are not interested in intercepting initiated payments.
13✔
512
                if packet.incomingChanID == hop.Source {
13✔
513
                        return false, nil
×
514
                }
×
515

516
                intercepted := &interceptedForward{
13✔
517
                        htlc:       htlc,
13✔
518
                        packet:     packet,
13✔
519
                        htlcSwitch: s.htlcSwitch,
13✔
520
                        autoFailHeight: int32(packet.incomingTimeout -
13✔
521
                                s.cltvRejectDelta),
13✔
522
                }
13✔
523

13✔
524
                // Handle forwards that are too close to expiry.
13✔
525
                handled, err := s.handleExpired(intercepted)
13✔
526
                if err != nil {
14✔
527
                        log.Errorf("Error handling intercepted htlc "+
1✔
528
                                "that expires too soon: circuit=%v, "+
1✔
529
                                "incoming_timeout=%v, err=%v",
1✔
530
                                packet.inKey(), packet.incomingTimeout, err)
1✔
531

1✔
532
                        // Return false so that the packet is offered as normal
1✔
533
                        // to the switch. This isn't ideal because interception
1✔
534
                        // may be configured as always-on and is skipped now.
1✔
535
                        // Returning true isn't great either, because the htlc
1✔
536
                        // will remain stuck and potentially force-close the
1✔
537
                        // channel. But in the end, we should never get here, so
1✔
538
                        // the actual return value doesn't matter that much.
1✔
539
                        return false, nil
1✔
540
                }
1✔
541
                if handled {
13✔
542
                        return true, nil
1✔
543
                }
1✔
544

545
                return s.forward(intercepted, isReplay)
11✔
546

547
        default:
3✔
548
                return false, nil
3✔
549
        }
550
}
551

552
// forward records the intercepted htlc and forwards it to the interceptor.
553
func (s *InterceptableSwitch) forward(
554
        fwd InterceptedForward, isReplay bool) (bool, error) {
11✔
555

11✔
556
        inKey := fwd.Packet().IncomingCircuit
11✔
557

11✔
558
        // Ignore already held htlcs.
11✔
559
        if s.heldHtlcSet.exists(inKey) {
11✔
560
                return true, nil
×
561
        }
×
562

563
        // If there is no interceptor currently registered, configuration and packet
564
        // replay status determine how the packet is handled.
565
        if s.interceptor == nil {
13✔
566
                // Process normally if an interceptor is not required.
2✔
567
                if !s.requireInterceptor {
2✔
568
                        return false, nil
×
569
                }
×
570

571
                // We are in interceptor-required mode. If this is a new packet, it is
572
                // still safe to fail back. The interceptor has never seen this packet
573
                // yet. This limits the backlog of htlcs when the interceptor is down.
574
                if !isReplay {
3✔
575
                        err := fwd.FailWithCode(
1✔
576
                                lnwire.CodeTemporaryChannelFailure,
1✔
577
                        )
1✔
578
                        if err != nil {
1✔
579
                                log.Errorf("Cannot fail packet: %v", err)
×
580
                        }
×
581

582
                        return true, nil
1✔
583
                }
584

585
                // This packet is a replay. It is not safe to fail back, because the
586
                // interceptor may still signal otherwise upon reconnect. Keep the
587
                // packet in the queue until then.
588
                if err := s.heldHtlcSet.push(inKey, fwd); err != nil {
1✔
589
                        return false, err
×
590
                }
×
591

592
                return true, nil
1✔
593
        }
594

595
        // There is an interceptor registered. We can forward the packet right now.
596
        // Hold it in the queue too to track what is outstanding.
597
        if err := s.heldHtlcSet.push(inKey, fwd); err != nil {
9✔
598
                return false, err
×
599
        }
×
600

601
        s.sendForward(fwd)
9✔
602

9✔
603
        return true, nil
9✔
604
}
605

606
// handleExpired checks that the htlc isn't too close to the channel
607
// force-close broadcast height. If it is, it is cancelled back.
608
func (s *InterceptableSwitch) handleExpired(fwd *interceptedForward) (
609
        bool, error) {
13✔
610

13✔
611
        height := uint32(s.currentHeight)
13✔
612
        if fwd.packet.incomingTimeout >= height+s.cltvInterceptDelta {
24✔
613
                return false, nil
11✔
614
        }
11✔
615

616
        log.Debugf("Interception rejected because htlc "+
2✔
617
                "expires too soon: circuit=%v, "+
2✔
618
                "height=%v, incoming_timeout=%v",
2✔
619
                fwd.packet.inKey(), height,
2✔
620
                fwd.packet.incomingTimeout)
2✔
621

2✔
622
        err := fwd.FailWithCode(
2✔
623
                lnwire.CodeExpiryTooSoon,
2✔
624
        )
2✔
625
        if err != nil {
3✔
626
                return false, err
1✔
627
        }
1✔
628

629
        return true, nil
1✔
630
}
631

632
// interceptedForward implements the InterceptedForward interface.
633
// It is passed from the switch to external interceptors that are interested
634
// in holding forwards and resolve them manually.
635
type interceptedForward struct {
636
        htlc           *lnwire.UpdateAddHTLC
637
        packet         *htlcPacket
638
        htlcSwitch     *Switch
639
        autoFailHeight int32
640
}
641

642
// Packet returns the intercepted htlc packet.
643
func (f *interceptedForward) Packet() InterceptedPacket {
25✔
644
        return InterceptedPacket{
25✔
645
                IncomingCircuit: models.CircuitKey{
25✔
646
                        ChanID: f.packet.incomingChanID,
25✔
647
                        HtlcID: f.packet.incomingHTLCID,
25✔
648
                },
25✔
649
                OutgoingChanID:       f.packet.outgoingChanID,
25✔
650
                Hash:                 f.htlc.PaymentHash,
25✔
651
                OutgoingExpiry:       f.htlc.Expiry,
25✔
652
                OutgoingAmount:       f.htlc.Amount,
25✔
653
                IncomingAmount:       f.packet.incomingAmount,
25✔
654
                IncomingExpiry:       f.packet.incomingTimeout,
25✔
655
                InOnionCustomRecords: f.packet.inOnionCustomRecords,
25✔
656
                OnionBlob:            f.htlc.OnionBlob,
25✔
657
                AutoFailHeight:       f.autoFailHeight,
25✔
658
                InWireCustomRecords:  f.packet.inWireCustomRecords,
25✔
659
        }
25✔
660
}
25✔
661

662
// Resume resumes the default behavior as if the packet was not intercepted.
663
func (f *interceptedForward) Resume() error {
2✔
664
        // Forward to the switch. A link quit channel isn't needed, because we
2✔
665
        // are on a different thread now.
2✔
666
        return f.htlcSwitch.ForwardPackets(nil, f.packet)
2✔
667
}
2✔
668

669
// ResumeModified resumes the default behavior with field modifications. The
670
// input amount (if provided) specifies that the value of the inbound HTLC
671
// should be interpreted differently from the on-chain amount during further
672
// validation. The presence of an output amount and/or custom records indicates
673
// that those values should be modified on the outgoing HTLC.
674
func (f *interceptedForward) ResumeModified(
675
        inAmountMsat fn.Option[lnwire.MilliSatoshi],
676
        outAmountMsat fn.Option[lnwire.MilliSatoshi],
677
        outWireCustomRecords fn.Option[lnwire.CustomRecords]) error {
×
678

×
679
        // Convert the optional custom records to the correct type and validate
×
680
        // them.
×
681
        validatedRecords, err := fn.MapOptionZ(
×
682
                outWireCustomRecords,
×
683
                func(cr lnwire.CustomRecords) fn.Result[lnwire.CustomRecords] {
×
684
                        if len(cr) == 0 {
×
685
                                return fn.Ok[lnwire.CustomRecords](nil)
×
686
                        }
×
687

688
                        // Type cast and validate custom records.
689
                        err := cr.Validate()
×
690
                        if err != nil {
×
691
                                return fn.Err[lnwire.CustomRecords](
×
692
                                        fmt.Errorf("failed to validate "+
×
693
                                                "custom records: %w", err),
×
694
                                )
×
695
                        }
×
696

697
                        return fn.Ok(cr)
×
698
                },
699
        ).Unpack()
700
        if err != nil {
×
701
                return fmt.Errorf("failed to encode custom records: %w",
×
702
                        err)
×
703
        }
×
704

705
        // Set the incoming amount, if it is provided, on the packet.
706
        inAmountMsat.WhenSome(func(amount lnwire.MilliSatoshi) {
×
707
                f.packet.incomingAmount = amount
×
708
        })
×
709

710
        // Modify the wire message contained in the packet.
711
        switch htlc := f.packet.htlc.(type) {
×
712
        case *lnwire.UpdateAddHTLC:
×
713
                outAmountMsat.WhenSome(func(amount lnwire.MilliSatoshi) {
×
714
                        f.packet.amount = amount
×
715
                        htlc.Amount = amount
×
716
                })
×
717

718
                // Merge custom records with any validated records that were
719
                // added in the modify request, overwriting any existing values
720
                // with those supplied in the modifier API.
721
                htlc.CustomRecords = htlc.CustomRecords.MergedCopy(
×
722
                        validatedRecords,
×
723
                )
×
724

725
        case *lnwire.UpdateFulfillHTLC:
×
726
                if len(validatedRecords) > 0 {
×
727
                        htlc.CustomRecords = validatedRecords
×
728
                }
×
729
        }
730

731
        log.Tracef("Forwarding packet %v", lnutils.SpewLogClosure(f.packet))
×
732

×
733
        // Forward to the switch. A link quit channel isn't needed, because we
×
734
        // are on a different thread now.
×
735
        return f.htlcSwitch.ForwardPackets(nil, f.packet)
×
736
}
737

738
// Fail notifies the intention to Fail an existing hold forward with an
739
// encrypted failure reason.
740
func (f *interceptedForward) Fail(reason []byte) error {
1✔
741
        obfuscatedReason := f.packet.obfuscator.IntermediateEncrypt(reason)
1✔
742

1✔
743
        return f.resolve(&lnwire.UpdateFailHTLC{
1✔
744
                Reason: obfuscatedReason,
1✔
745
        })
1✔
746
}
1✔
747

748
// FailWithCode notifies the intention to fail an existing hold forward with the
749
// specified failure code.
750
func (f *interceptedForward) FailWithCode(code lnwire.FailCode) error {
8✔
751
        shaOnionBlob := func() [32]byte {
9✔
752
                return sha256.Sum256(f.htlc.OnionBlob[:])
1✔
753
        }
1✔
754

755
        // Create a local failure.
756
        var failureMsg lnwire.FailureMessage
8✔
757

8✔
758
        switch code {
8✔
759
        case lnwire.CodeInvalidOnionVersion:
×
760
                failureMsg = &lnwire.FailInvalidOnionVersion{
×
761
                        OnionSHA256: shaOnionBlob(),
×
762
                }
×
763

764
        case lnwire.CodeInvalidOnionHmac:
×
765
                failureMsg = &lnwire.FailInvalidOnionHmac{
×
766
                        OnionSHA256: shaOnionBlob(),
×
767
                }
×
768

769
        case lnwire.CodeInvalidOnionKey:
1✔
770
                failureMsg = &lnwire.FailInvalidOnionKey{
1✔
771
                        OnionSHA256: shaOnionBlob(),
1✔
772
                }
1✔
773

774
        case lnwire.CodeTemporaryChannelFailure:
5✔
775
                update := f.htlcSwitch.failAliasUpdate(
5✔
776
                        f.packet.incomingChanID, true,
5✔
777
                )
5✔
778
                if update == nil {
8✔
779
                        // Fallback to the original, non-alias behavior.
3✔
780
                        var err error
3✔
781
                        update, err = f.htlcSwitch.cfg.FetchLastChannelUpdate(
3✔
782
                                f.packet.incomingChanID,
3✔
783
                        )
3✔
784
                        if err != nil {
3✔
785
                                return err
×
786
                        }
×
787
                }
788

789
                failureMsg = lnwire.NewTemporaryChannelFailure(update)
5✔
790

791
        case lnwire.CodeExpiryTooSoon:
2✔
792
                update, err := f.htlcSwitch.cfg.FetchLastChannelUpdate(
2✔
793
                        f.packet.incomingChanID,
2✔
794
                )
2✔
795
                if err != nil {
3✔
796
                        return err
1✔
797
                }
1✔
798

799
                failureMsg = lnwire.NewExpiryTooSoon(*update)
1✔
800

801
        default:
×
802
                return ErrUnsupportedFailureCode
×
803
        }
804

805
        // Encrypt the failure for the first hop. This node will be the origin
806
        // of the failure.
807
        reason, err := f.packet.obfuscator.EncryptFirstHop(failureMsg)
7✔
808
        if err != nil {
7✔
809
                return fmt.Errorf("failed to encrypt failure reason %w", err)
×
810
        }
×
811

812
        return f.resolve(&lnwire.UpdateFailHTLC{
7✔
813
                Reason: reason,
7✔
814
        })
7✔
815
}
816

817
// Settle forwards a settled packet to the switch.
818
func (f *interceptedForward) Settle(preimage lntypes.Preimage) error {
2✔
819
        if !preimage.Matches(f.htlc.PaymentHash) {
2✔
820
                return errors.New("preimage does not match hash")
×
821
        }
×
822
        return f.resolve(&lnwire.UpdateFulfillHTLC{
2✔
823
                PaymentPreimage: preimage,
2✔
824
        })
2✔
825
}
826

827
// resolve is used for both Settle and Fail and forwards the message to the
828
// switch.
829
func (f *interceptedForward) resolve(message lnwire.Message) error {
10✔
830
        pkt := &htlcPacket{
10✔
831
                incomingChanID: f.packet.incomingChanID,
10✔
832
                incomingHTLCID: f.packet.incomingHTLCID,
10✔
833
                outgoingChanID: f.packet.outgoingChanID,
10✔
834
                outgoingHTLCID: f.packet.outgoingHTLCID,
10✔
835
                isResolution:   true,
10✔
836
                circuit:        f.packet.circuit,
10✔
837
                htlc:           message,
10✔
838
                obfuscator:     f.packet.obfuscator,
10✔
839
                sourceRef:      f.packet.sourceRef,
10✔
840
        }
10✔
841
        return f.htlcSwitch.mailOrchestrator.Deliver(pkt.incomingChanID, pkt)
10✔
842
}
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc