• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14350609076

09 Apr 2025 06:35AM UTC coverage: 58.66%. First build
14350609076

Pull #9691

github

web-flow
Merge 001660a9d into ac052988c
Pull Request #9691: htlcswitch+peer: thread context through in preparation for passing to graph DB calls

192 of 264 new or added lines in 15 files covered. (72.73%)

97242 of 165773 relevant lines covered (58.66%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.58
/htlcswitch/interceptable_switch.go
1
package htlcswitch
2

3
import (
4
        "context"
5
        "crypto/sha256"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/go-errors/errors"
11
        "github.com/lightningnetwork/lnd/chainntnfs"
12
        "github.com/lightningnetwork/lnd/fn/v2"
13
        "github.com/lightningnetwork/lnd/graph/db/models"
14
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
15
        "github.com/lightningnetwork/lnd/lntypes"
16
        "github.com/lightningnetwork/lnd/lnutils"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
)
19

20
var (
21
        // ErrFwdNotExists is an error returned when the caller tries to resolve
22
        // a forward that doesn't exist anymore.
23
        ErrFwdNotExists = errors.New("forward does not exist")
24

25
        // ErrUnsupportedFailureCode when processing of an unsupported failure
26
        // code is attempted.
27
        ErrUnsupportedFailureCode = errors.New("unsupported failure code")
28

29
        errBlockStreamStopped = errors.New("block epoch stream stopped")
30
)
31

32
// InterceptableSwitch is an implementation of ForwardingSwitch interface.
33
// This implementation is used like a proxy that wraps the switch and
34
// intercepts forward requests. A reference to the Switch is held in order
35
// to communicate back the interception result where the options are:
36
// Resume - forwards the original request to the switch as is.
37
// Settle - routes UpdateFulfillHTLC to the originating link.
38
// Fail - routes UpdateFailHTLC to the originating link.
39
type InterceptableSwitch struct {
40
        started atomic.Bool
41
        stopped atomic.Bool
42

43
        // htlcSwitch is the underline switch
44
        htlcSwitch *Switch
45

46
        // intercepted is where we stream all intercepted packets coming from
47
        // the switch.
48
        intercepted chan *interceptedPackets
49

50
        // resolutionChan is where we stream all responses coming from the
51
        // interceptor client.
52
        resolutionChan chan *fwdResolution
53

54
        onchainIntercepted chan InterceptedForward
55

56
        // interceptorRegistration is a channel that we use to synchronize
57
        // client connect and disconnect.
58
        interceptorRegistration chan ForwardInterceptor
59

60
        // requireInterceptor indicates whether processing should block if no
61
        // interceptor is connected.
62
        requireInterceptor bool
63

64
        // interceptor is the handler for intercepted packets.
65
        interceptor ForwardInterceptor
66

67
        // heldHtlcSet keeps track of outstanding intercepted forwards.
68
        heldHtlcSet *heldHtlcSet
69

70
        // cltvRejectDelta defines the number of blocks before the expiry of the
71
        // htlc where we no longer intercept it and instead cancel it back.
72
        cltvRejectDelta uint32
73

74
        // cltvInterceptDelta defines the number of blocks before the expiry of
75
        // the htlc where we don't intercept anymore. This value must be greater
76
        // than CltvRejectDelta, because we don't want to offer htlcs to the
77
        // interceptor client for which there is no time left to resolve them
78
        // anymore.
79
        cltvInterceptDelta uint32
80

81
        // notifier is an instance of a chain notifier that we'll use to signal
82
        // the switch when a new block has arrived.
83
        notifier chainntnfs.ChainNotifier
84

85
        // blockEpochStream is an active block epoch event stream backed by an
86
        // active ChainNotifier instance. This will be used to retrieve the
87
        // latest height of the chain.
88
        blockEpochStream *chainntnfs.BlockEpochEvent
89

90
        // currentHeight is the currently best known height.
91
        currentHeight int32
92

93
        wg     sync.WaitGroup
94
        quit   chan struct{}
95
        cancel fn.Option[context.CancelFunc]
96
}
97

98
type interceptedPackets struct {
99
        packets  []*htlcPacket
100
        linkQuit <-chan struct{}
101
        isReplay bool
102
}
103

104
// FwdAction defines the various resolution types.
105
type FwdAction int
106

107
const (
108
        // FwdActionResume forwards the intercepted packet to the switch.
109
        FwdActionResume FwdAction = iota
110

111
        // FwdActionSettle settles the intercepted packet with a preimage.
112
        FwdActionSettle
113

114
        // FwdActionFail fails the intercepted packet back to the sender.
115
        FwdActionFail
116

117
        // FwdActionResumeModified forwards the intercepted packet to the switch
118
        // with modifications.
119
        FwdActionResumeModified
120
)
121

122
// FwdResolution defines the action to be taken on an intercepted packet.
123
type FwdResolution struct {
124
        // Key is the incoming circuit key of the htlc.
125
        Key models.CircuitKey
126

127
        // Action is the action to take on the intercepted htlc.
128
        Action FwdAction
129

130
        // Preimage is the preimage that is to be used for settling if Action is
131
        // FwdActionSettle.
132
        Preimage lntypes.Preimage
133

134
        // InAmountMsat is the amount that is to be used for validating if
135
        // Action is FwdActionResumeModified.
136
        InAmountMsat fn.Option[lnwire.MilliSatoshi]
137

138
        // OutAmountMsat is the amount that is to be used for forwarding if
139
        // Action is FwdActionResumeModified.
140
        OutAmountMsat fn.Option[lnwire.MilliSatoshi]
141

142
        // OutWireCustomRecords is the custom records that are to be used for
143
        // forwarding if Action is FwdActionResumeModified.
144
        OutWireCustomRecords fn.Option[lnwire.CustomRecords]
145

146
        // FailureMessage is the encrypted failure message that is to be passed
147
        // back to the sender if action is FwdActionFail.
148
        FailureMessage []byte
149

150
        // FailureCode is the failure code that is to be passed back to the
151
        // sender if action is FwdActionFail.
152
        FailureCode lnwire.FailCode
153
}
154

155
type fwdResolution struct {
156
        resolution *FwdResolution
157
        errChan    chan error
158
}
159

160
// InterceptableSwitchConfig contains the configuration of InterceptableSwitch.
161
type InterceptableSwitchConfig struct {
162
        // Switch is a reference to the actual switch implementation that
163
        // packets get sent to on resume.
164
        Switch *Switch
165

166
        // Notifier is an instance of a chain notifier that we'll use to signal
167
        // the switch when a new block has arrived.
168
        Notifier chainntnfs.ChainNotifier
169

170
        // CltvRejectDelta defines the number of blocks before the expiry of the
171
        // htlc where we auto-fail an intercepted htlc to prevent channel
172
        // force-closure.
173
        CltvRejectDelta uint32
174

175
        // CltvInterceptDelta defines the number of blocks before the expiry of
176
        // the htlc where we don't intercept anymore. This value must be greater
177
        // than CltvRejectDelta, because we don't want to offer htlcs to the
178
        // interceptor client for which there is no time left to resolve them
179
        // anymore.
180
        CltvInterceptDelta uint32
181

182
        // RequireInterceptor indicates whether processing should block if no
183
        // interceptor is connected.
184
        RequireInterceptor bool
185
}
186

187
// NewInterceptableSwitch returns an instance of InterceptableSwitch.
188
func NewInterceptableSwitch(cfg *InterceptableSwitchConfig) (
189
        *InterceptableSwitch, error) {
3✔
190

3✔
191
        if cfg.CltvInterceptDelta <= cfg.CltvRejectDelta {
3✔
192
                return nil, fmt.Errorf("cltv intercept delta %v not greater "+
×
193
                        "than cltv reject delta %v",
×
194
                        cfg.CltvInterceptDelta, cfg.CltvRejectDelta)
×
195
        }
×
196

197
        return &InterceptableSwitch{
3✔
198
                htlcSwitch:              cfg.Switch,
3✔
199
                intercepted:             make(chan *interceptedPackets),
3✔
200
                onchainIntercepted:      make(chan InterceptedForward),
3✔
201
                interceptorRegistration: make(chan ForwardInterceptor),
3✔
202
                heldHtlcSet:             newHeldHtlcSet(),
3✔
203
                resolutionChan:          make(chan *fwdResolution),
3✔
204
                requireInterceptor:      cfg.RequireInterceptor,
3✔
205
                cltvRejectDelta:         cfg.CltvRejectDelta,
3✔
206
                cltvInterceptDelta:      cfg.CltvInterceptDelta,
3✔
207
                notifier:                cfg.Notifier,
3✔
208

3✔
209
                quit: make(chan struct{}),
3✔
210
        }, nil
3✔
211
}
212

213
// SetInterceptor sets the ForwardInterceptor to be used. A nil argument
214
// unregisters the current interceptor.
215
func (s *InterceptableSwitch) SetInterceptor(
216
        interceptor ForwardInterceptor) {
3✔
217

3✔
218
        // Synchronize setting the handler with the main loop to prevent race
3✔
219
        // conditions.
3✔
220
        select {
3✔
221
        case s.interceptorRegistration <- interceptor:
3✔
222

223
        case <-s.quit:
×
224
        }
225
}
226

227
func (s *InterceptableSwitch) Start(ctx context.Context) error {
3✔
228
        log.Info("InterceptableSwitch starting...")
3✔
229

3✔
230
        if s.started.Swap(true) {
3✔
231
                return fmt.Errorf("InterceptableSwitch started more than once")
×
232
        }
×
233
        ctx, cancel := context.WithCancel(ctx)
3✔
234
        s.cancel = fn.Some(cancel)
3✔
235

3✔
236
        blockEpochStream, err := s.notifier.RegisterBlockEpochNtfn(nil)
3✔
237
        if err != nil {
3✔
238
                return err
×
239
        }
×
240
        s.blockEpochStream = blockEpochStream
3✔
241

3✔
242
        s.wg.Add(1)
3✔
243
        go func() {
6✔
244
                defer s.wg.Done()
3✔
245

3✔
246
                err := s.run(ctx)
3✔
247
                if err != nil {
3✔
248
                        log.Errorf("InterceptableSwitch stopped: %v", err)
×
249
                }
×
250
        }()
251

252
        log.Debug("InterceptableSwitch started")
3✔
253

3✔
254
        return nil
3✔
255
}
256

257
func (s *InterceptableSwitch) Stop() error {
3✔
258
        log.Info("InterceptableSwitch shutting down...")
3✔
259

3✔
260
        if s.stopped.Swap(true) {
3✔
261
                return fmt.Errorf("InterceptableSwitch stopped more than once")
×
262
        }
×
263

264
        s.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
6✔
265
        close(s.quit)
3✔
266
        s.wg.Wait()
3✔
267

3✔
268
        // We need to check whether the start routine run and initialized the
3✔
269
        // `blockEpochStream`.
3✔
270
        if s.blockEpochStream != nil {
6✔
271
                s.blockEpochStream.Cancel()
3✔
272
        }
3✔
273

274
        log.Debug("InterceptableSwitch shutdown complete")
3✔
275

3✔
276
        return nil
3✔
277
}
278

279
func (s *InterceptableSwitch) run(ctx context.Context) error {
3✔
280
        // The block epoch stream will immediately stream the current height.
3✔
281
        // Read it out here.
3✔
282
        select {
3✔
283
        case currentBlock, ok := <-s.blockEpochStream.Epochs:
3✔
284
                if !ok {
3✔
285
                        return errBlockStreamStopped
×
286
                }
×
287
                s.currentHeight = currentBlock.Height
3✔
288

289
        case <-s.quit:
×
290
                return nil
×
291
        }
292

293
        log.Debugf("InterceptableSwitch running: height=%v, "+
3✔
294
                "requireInterceptor=%v", s.currentHeight, s.requireInterceptor)
3✔
295

3✔
296
        for {
6✔
297
                select {
3✔
298
                // An interceptor registration or de-registration came in.
299
                case interceptor := <-s.interceptorRegistration:
3✔
300
                        s.setInterceptor(ctx, interceptor)
3✔
301

302
                case packets := <-s.intercepted:
3✔
303
                        var notIntercepted []*htlcPacket
3✔
304
                        for _, p := range packets.packets {
6✔
305
                                intercepted, err := s.interceptForward(
3✔
306
                                        ctx, p, packets.isReplay,
3✔
307
                                )
3✔
308
                                if err != nil {
3✔
309
                                        return err
×
310
                                }
×
311

312
                                if !intercepted {
6✔
313
                                        notIntercepted = append(
3✔
314
                                                notIntercepted, p,
3✔
315
                                        )
3✔
316
                                }
3✔
317
                        }
318
                        err := s.htlcSwitch.ForwardPackets(
3✔
319
                                ctx, packets.linkQuit, notIntercepted...,
3✔
320
                        )
3✔
321
                        if err != nil {
3✔
322
                                log.Errorf("Cannot forward packets: %v", err)
×
323
                        }
×
324

325
                case fwd := <-s.onchainIntercepted:
3✔
326
                        // For on-chain interceptions, we don't know if it has
3✔
327
                        // already been offered before. This information is in
3✔
328
                        // the forwarding package which isn't easily accessible
3✔
329
                        // from contractcourt. It is likely though that it was
3✔
330
                        // already intercepted in the off-chain flow. And even
3✔
331
                        // if not, it is safe to signal replay so that we won't
3✔
332
                        // unexpectedly skip over this htlc.
3✔
333
                        if _, err := s.forward(ctx, fwd, true); err != nil {
3✔
334
                                return err
×
335
                        }
×
336

337
                case res := <-s.resolutionChan:
3✔
338
                        res.errChan <- s.resolve(ctx, res.resolution)
3✔
339

340
                case currentBlock, ok := <-s.blockEpochStream.Epochs:
3✔
341
                        if !ok {
3✔
342
                                return errBlockStreamStopped
×
343
                        }
×
344

345
                        s.currentHeight = currentBlock.Height
3✔
346

3✔
347
                        // A new block is appended. Fail any held htlcs that
3✔
348
                        // expire at this height to prevent channel force-close.
3✔
349
                        s.failExpiredHtlcs(ctx)
3✔
350

351
                case <-s.quit:
3✔
352
                        return nil
3✔
353
                }
354
        }
355
}
356

357
func (s *InterceptableSwitch) failExpiredHtlcs(ctx context.Context) {
3✔
358
        s.heldHtlcSet.popAutoFails(
3✔
359
                ctx, uint32(s.currentHeight),
3✔
360
                func(ctx context.Context, fwd InterceptedForward) {
3✔
361
                        err := fwd.FailWithCode(
×
NEW
362
                                ctx, lnwire.CodeTemporaryChannelFailure,
×
363
                        )
×
364
                        if err != nil {
×
365
                                log.Errorf("Cannot fail packet: %v", err)
×
366
                        }
×
367
                },
368
        )
369
}
370

371
func (s *InterceptableSwitch) sendForward(fwd InterceptedForward) {
3✔
372
        err := s.interceptor(fwd.Packet())
3✔
373
        if err != nil {
3✔
374
                // Only log the error. If we couldn't send the packet, we assume
×
375
                // that the interceptor will reconnect so that we can retry.
×
376
                log.Debugf("Interceptor cannot handle forward: %v", err)
×
377
        }
×
378
}
379

380
func (s *InterceptableSwitch) setInterceptor(ctx context.Context,
381
        interceptor ForwardInterceptor) {
3✔
382

3✔
383
        s.interceptor = interceptor
3✔
384

3✔
385
        // Replay all currently held htlcs. When an interceptor is not required,
3✔
386
        // there may be none because they've been cleared after the previous
3✔
387
        // disconnect.
3✔
388
        if interceptor != nil {
6✔
389
                log.Debugf("Interceptor connected")
3✔
390

3✔
391
                s.heldHtlcSet.forEach(s.sendForward)
3✔
392

3✔
393
                return
3✔
394
        }
3✔
395

396
        // The interceptor disconnects. If an interceptor is required, keep the
397
        // held htlcs.
398
        if s.requireInterceptor {
6✔
399
                log.Infof("Interceptor disconnected, retaining held packets")
3✔
400

3✔
401
                return
3✔
402
        }
3✔
403

404
        // Interceptor is not required. Release held forwards.
405
        log.Infof("Interceptor disconnected, resolving held packets")
3✔
406

3✔
407
        s.heldHtlcSet.popAll(func(fwd InterceptedForward) {
6✔
408
                err := fwd.Resume(ctx)
3✔
409
                if err != nil {
3✔
410
                        log.Errorf("Failed to resume hold forward %v", err)
×
411
                }
×
412
        })
413
}
414

415
// resolve processes a HTLC given the resolution type specified by the
416
// intercepting client.
417
func (s *InterceptableSwitch) resolve(ctx context.Context,
418
        res *FwdResolution) error {
3✔
419

3✔
420
        intercepted, err := s.heldHtlcSet.pop(res.Key)
3✔
421
        if err != nil {
3✔
422
                return err
×
423
        }
×
424

425
        switch res.Action {
3✔
426
        case FwdActionResume:
3✔
427
                return intercepted.Resume(ctx)
3✔
428

429
        case FwdActionResumeModified:
3✔
430
                return intercepted.ResumeModified(
3✔
431
                        ctx, res.InAmountMsat, res.OutAmountMsat,
3✔
432
                        res.OutWireCustomRecords,
3✔
433
                )
3✔
434

435
        case FwdActionSettle:
3✔
436
                return intercepted.Settle(ctx, res.Preimage)
3✔
437

438
        case FwdActionFail:
3✔
439
                if len(res.FailureMessage) > 0 {
3✔
NEW
440
                        return intercepted.Fail(ctx, res.FailureMessage)
×
441
                }
×
442

443
                return intercepted.FailWithCode(ctx, res.FailureCode)
3✔
444

445
        default:
×
446
                return fmt.Errorf("unrecognized action %v", res.Action)
×
447
        }
448
}
449

450
// Resolve resolves an intercepted packet.
451
func (s *InterceptableSwitch) Resolve(res *FwdResolution) error {
3✔
452
        internalRes := &fwdResolution{
3✔
453
                resolution: res,
3✔
454
                errChan:    make(chan error, 1),
3✔
455
        }
3✔
456

3✔
457
        select {
3✔
458
        case s.resolutionChan <- internalRes:
3✔
459

460
        case <-s.quit:
×
461
                return errors.New("switch shutting down")
×
462
        }
463

464
        select {
3✔
465
        case err := <-internalRes.errChan:
3✔
466
                return err
3✔
467

468
        case <-s.quit:
×
469
                return errors.New("switch shutting down")
×
470
        }
471
}
472

473
// ForwardPackets attempts to forward the batch of htlcs to a connected
474
// interceptor. If the interceptor signals the resume action, the htlcs are
475
// forwarded to the switch. The link's quit signal should be provided to allow
476
// cancellation of forwarding during link shutdown.
477
func (s *InterceptableSwitch) ForwardPackets(linkQuit <-chan struct{},
478
        isReplay bool, packets ...*htlcPacket) error {
3✔
479

3✔
480
        // Synchronize with the main event loop. This should be light in the
3✔
481
        // case where there is no interceptor.
3✔
482
        select {
3✔
483
        case s.intercepted <- &interceptedPackets{
484
                packets:  packets,
485
                linkQuit: linkQuit,
486
                isReplay: isReplay,
487
        }:
3✔
488

489
        case <-linkQuit:
×
490
                log.Debugf("Forward cancelled because link quit")
×
491

492
        case <-s.quit:
×
493
                return errors.New("interceptable switch quit")
×
494
        }
495

496
        return nil
3✔
497
}
498

499
// ForwardPacket forwards a single htlc to the external interceptor.
500
func (s *InterceptableSwitch) ForwardPacket(
501
        fwd InterceptedForward) error {
3✔
502

3✔
503
        select {
3✔
504
        case s.onchainIntercepted <- fwd:
3✔
505

506
        case <-s.quit:
×
507
                return errors.New("interceptable switch quit")
×
508
        }
509

510
        return nil
3✔
511
}
512

513
// interceptForward forwards the packet to the external interceptor after
514
// checking the interception criteria.
515
func (s *InterceptableSwitch) interceptForward(ctx context.Context,
516
        packet *htlcPacket, isReplay bool) (bool, error) {
3✔
517

3✔
518
        switch htlc := packet.htlc.(type) {
3✔
519
        case *lnwire.UpdateAddHTLC:
3✔
520
                // We are not interested in intercepting initiated payments.
3✔
521
                if packet.incomingChanID == hop.Source {
3✔
522
                        return false, nil
×
523
                }
×
524

525
                intercepted := &interceptedForward{
3✔
526
                        htlc:       htlc,
3✔
527
                        packet:     packet,
3✔
528
                        htlcSwitch: s.htlcSwitch,
3✔
529
                        autoFailHeight: int32(packet.incomingTimeout -
3✔
530
                                s.cltvRejectDelta),
3✔
531
                }
3✔
532

3✔
533
                // Handle forwards that are too close to expiry.
3✔
534
                handled, err := s.handleExpired(ctx, intercepted)
3✔
535
                if err != nil {
3✔
536
                        log.Errorf("Error handling intercepted htlc "+
×
537
                                "that expires too soon: circuit=%v, "+
×
538
                                "incoming_timeout=%v, err=%v",
×
539
                                packet.inKey(), packet.incomingTimeout, err)
×
540

×
541
                        // Return false so that the packet is offered as normal
×
542
                        // to the switch. This isn't ideal because interception
×
543
                        // may be configured as always-on and is skipped now.
×
544
                        // Returning true isn't great either, because the htlc
×
545
                        // will remain stuck and potentially force-close the
×
546
                        // channel. But in the end, we should never get here, so
×
547
                        // the actual return value doesn't matter that much.
×
548
                        return false, nil
×
549
                }
×
550
                if handled {
3✔
551
                        return true, nil
×
552
                }
×
553

554
                return s.forward(ctx, intercepted, isReplay)
3✔
555

556
        default:
3✔
557
                return false, nil
3✔
558
        }
559
}
560

561
// forward records the intercepted htlc and forwards it to the interceptor.
562
func (s *InterceptableSwitch) forward(ctx context.Context,
563
        fwd InterceptedForward, isReplay bool) (bool, error) {
3✔
564

3✔
565
        inKey := fwd.Packet().IncomingCircuit
3✔
566

3✔
567
        // Ignore already held htlcs.
3✔
568
        if s.heldHtlcSet.exists(inKey) {
6✔
569
                return true, nil
3✔
570
        }
3✔
571

572
        // If there is no interceptor currently registered, configuration and packet
573
        // replay status determine how the packet is handled.
574
        if s.interceptor == nil {
6✔
575
                // Process normally if an interceptor is not required.
3✔
576
                if !s.requireInterceptor {
6✔
577
                        return false, nil
3✔
578
                }
3✔
579

580
                // We are in interceptor-required mode. If this is a new packet, it is
581
                // still safe to fail back. The interceptor has never seen this packet
582
                // yet. This limits the backlog of htlcs when the interceptor is down.
583
                if !isReplay {
3✔
584
                        err := fwd.FailWithCode(
×
NEW
585
                                ctx, lnwire.CodeTemporaryChannelFailure,
×
586
                        )
×
587
                        if err != nil {
×
588
                                log.Errorf("Cannot fail packet: %v", err)
×
589
                        }
×
590

591
                        return true, nil
×
592
                }
593

594
                // This packet is a replay. It is not safe to fail back, because the
595
                // interceptor may still signal otherwise upon reconnect. Keep the
596
                // packet in the queue until then.
597
                if err := s.heldHtlcSet.push(inKey, fwd); err != nil {
3✔
598
                        return false, err
×
599
                }
×
600

601
                return true, nil
3✔
602
        }
603

604
        // There is an interceptor registered. We can forward the packet right now.
605
        // Hold it in the queue too to track what is outstanding.
606
        if err := s.heldHtlcSet.push(inKey, fwd); err != nil {
3✔
607
                return false, err
×
608
        }
×
609

610
        s.sendForward(fwd)
3✔
611

3✔
612
        return true, nil
3✔
613
}
614

615
// handleExpired checks that the htlc isn't too close to the channel
616
// force-close broadcast height. If it is, it is cancelled back.
617
func (s *InterceptableSwitch) handleExpired(ctx context.Context,
618
        fwd *interceptedForward) (bool, error) {
3✔
619

3✔
620
        height := uint32(s.currentHeight)
3✔
621
        if fwd.packet.incomingTimeout >= height+s.cltvInterceptDelta {
6✔
622
                return false, nil
3✔
623
        }
3✔
624

625
        log.Debugf("Interception rejected because htlc "+
×
626
                "expires too soon: circuit=%v, "+
×
627
                "height=%v, incoming_timeout=%v",
×
628
                fwd.packet.inKey(), height,
×
629
                fwd.packet.incomingTimeout)
×
630

×
631
        err := fwd.FailWithCode(
×
NEW
632
                ctx, lnwire.CodeExpiryTooSoon,
×
633
        )
×
634
        if err != nil {
×
635
                return false, err
×
636
        }
×
637

638
        return true, nil
×
639
}
640

641
// interceptedForward implements the InterceptedForward interface.
642
// It is passed from the switch to external interceptors that are interested
643
// in holding forwards and resolve them manually.
644
type interceptedForward struct {
645
        htlc           *lnwire.UpdateAddHTLC
646
        packet         *htlcPacket
647
        htlcSwitch     *Switch
648
        autoFailHeight int32
649
}
650

651
// Packet returns the intercepted htlc packet.
652
func (f *interceptedForward) Packet() InterceptedPacket {
3✔
653
        return InterceptedPacket{
3✔
654
                IncomingCircuit: models.CircuitKey{
3✔
655
                        ChanID: f.packet.incomingChanID,
3✔
656
                        HtlcID: f.packet.incomingHTLCID,
3✔
657
                },
3✔
658
                OutgoingChanID:       f.packet.outgoingChanID,
3✔
659
                Hash:                 f.htlc.PaymentHash,
3✔
660
                OutgoingExpiry:       f.htlc.Expiry,
3✔
661
                OutgoingAmount:       f.htlc.Amount,
3✔
662
                IncomingAmount:       f.packet.incomingAmount,
3✔
663
                IncomingExpiry:       f.packet.incomingTimeout,
3✔
664
                InOnionCustomRecords: f.packet.inOnionCustomRecords,
3✔
665
                OnionBlob:            f.htlc.OnionBlob,
3✔
666
                AutoFailHeight:       f.autoFailHeight,
3✔
667
                InWireCustomRecords:  f.packet.inWireCustomRecords,
3✔
668
        }
3✔
669
}
3✔
670

671
// Resume resumes the default behavior as if the packet was not intercepted.
672
func (f *interceptedForward) Resume(ctx context.Context) error {
3✔
673
        // Forward to the switch. A link quit channel isn't needed, because we
3✔
674
        // are on a different thread now.
3✔
675
        return f.htlcSwitch.ForwardPackets(ctx, nil, f.packet)
3✔
676
}
3✔
677

678
// ResumeModified resumes the default behavior with field modifications. The
679
// input amount (if provided) specifies that the value of the inbound HTLC
680
// should be interpreted differently from the on-chain amount during further
681
// validation. The presence of an output amount and/or custom records indicates
682
// that those values should be modified on the outgoing HTLC.
683
func (f *interceptedForward) ResumeModified(ctx context.Context,
684
        inAmountMsat fn.Option[lnwire.MilliSatoshi],
685
        outAmountMsat fn.Option[lnwire.MilliSatoshi],
686
        outWireCustomRecords fn.Option[lnwire.CustomRecords]) error {
3✔
687

3✔
688
        // Convert the optional custom records to the correct type and validate
3✔
689
        // them.
3✔
690
        validatedRecords, err := fn.MapOptionZ(
3✔
691
                outWireCustomRecords,
3✔
692
                func(cr lnwire.CustomRecords) fn.Result[lnwire.CustomRecords] {
6✔
693
                        if len(cr) == 0 {
3✔
694
                                return fn.Ok[lnwire.CustomRecords](nil)
×
695
                        }
×
696

697
                        // Type cast and validate custom records.
698
                        err := cr.Validate()
3✔
699
                        if err != nil {
3✔
700
                                return fn.Err[lnwire.CustomRecords](
×
701
                                        fmt.Errorf("failed to validate "+
×
702
                                                "custom records: %w", err),
×
703
                                )
×
704
                        }
×
705

706
                        return fn.Ok(cr)
3✔
707
                },
708
        ).Unpack()
709
        if err != nil {
3✔
710
                return fmt.Errorf("failed to encode custom records: %w",
×
711
                        err)
×
712
        }
×
713

714
        // Set the incoming amount, if it is provided, on the packet.
715
        inAmountMsat.WhenSome(func(amount lnwire.MilliSatoshi) {
3✔
716
                f.packet.incomingAmount = amount
×
717
        })
×
718

719
        // Modify the wire message contained in the packet.
720
        switch htlc := f.packet.htlc.(type) {
3✔
721
        case *lnwire.UpdateAddHTLC:
3✔
722
                outAmountMsat.WhenSome(func(amount lnwire.MilliSatoshi) {
6✔
723
                        f.packet.amount = amount
3✔
724
                        htlc.Amount = amount
3✔
725
                })
3✔
726

727
                // Merge custom records with any validated records that were
728
                // added in the modify request, overwriting any existing values
729
                // with those supplied in the modifier API.
730
                htlc.CustomRecords = htlc.CustomRecords.MergedCopy(
3✔
731
                        validatedRecords,
3✔
732
                )
3✔
733

734
        case *lnwire.UpdateFulfillHTLC:
×
735
                if len(validatedRecords) > 0 {
×
736
                        htlc.CustomRecords = validatedRecords
×
737
                }
×
738
        }
739

740
        log.Tracef("Forwarding packet %v", lnutils.SpewLogClosure(f.packet))
3✔
741

3✔
742
        // Forward to the switch. A link quit channel isn't needed, because we
3✔
743
        // are on a different thread now.
3✔
744
        return f.htlcSwitch.ForwardPackets(ctx, nil, f.packet)
3✔
745
}
746

747
// Fail notifies the intention to Fail an existing hold forward with an
748
// encrypted failure reason.
NEW
749
func (f *interceptedForward) Fail(ctx context.Context, reason []byte) error {
×
750
        obfuscatedReason := f.packet.obfuscator.IntermediateEncrypt(reason)
×
751

×
NEW
752
        return f.resolve(ctx, &lnwire.UpdateFailHTLC{
×
753
                Reason: obfuscatedReason,
×
754
        })
×
755
}
×
756

757
// FailWithCode notifies the intention to fail an existing hold forward with the
758
// specified failure code.
759
func (f *interceptedForward) FailWithCode(ctx context.Context,
760
        code lnwire.FailCode) error {
3✔
761

3✔
762
        shaOnionBlob := func() [32]byte {
3✔
763
                return sha256.Sum256(f.htlc.OnionBlob[:])
×
764
        }
×
765

766
        // Create a local failure.
767
        var failureMsg lnwire.FailureMessage
3✔
768

3✔
769
        switch code {
3✔
770
        case lnwire.CodeInvalidOnionVersion:
×
771
                failureMsg = &lnwire.FailInvalidOnionVersion{
×
772
                        OnionSHA256: shaOnionBlob(),
×
773
                }
×
774

775
        case lnwire.CodeInvalidOnionHmac:
×
776
                failureMsg = &lnwire.FailInvalidOnionHmac{
×
777
                        OnionSHA256: shaOnionBlob(),
×
778
                }
×
779

780
        case lnwire.CodeInvalidOnionKey:
×
781
                failureMsg = &lnwire.FailInvalidOnionKey{
×
782
                        OnionSHA256: shaOnionBlob(),
×
783
                }
×
784

785
        case lnwire.CodeTemporaryChannelFailure:
3✔
786
                update := f.htlcSwitch.failAliasUpdate(
3✔
787
                        ctx, f.packet.incomingChanID, true,
3✔
788
                )
3✔
789
                if update == nil {
6✔
790
                        // Fallback to the original, non-alias behavior.
3✔
791
                        var err error
3✔
792
                        update, err = f.htlcSwitch.cfg.FetchLastChannelUpdate(
3✔
793
                                ctx, f.packet.incomingChanID,
3✔
794
                        )
3✔
795
                        if err != nil {
3✔
796
                                return err
×
797
                        }
×
798
                }
799

800
                failureMsg = lnwire.NewTemporaryChannelFailure(update)
3✔
801

802
        case lnwire.CodeExpiryTooSoon:
×
803
                update, err := f.htlcSwitch.cfg.FetchLastChannelUpdate(
×
NEW
804
                        ctx, f.packet.incomingChanID,
×
805
                )
×
806
                if err != nil {
×
807
                        return err
×
808
                }
×
809

810
                failureMsg = lnwire.NewExpiryTooSoon(*update)
×
811

812
        default:
×
813
                return ErrUnsupportedFailureCode
×
814
        }
815

816
        // Encrypt the failure for the first hop. This node will be the origin
817
        // of the failure.
818
        reason, err := f.packet.obfuscator.EncryptFirstHop(failureMsg)
3✔
819
        if err != nil {
3✔
820
                return fmt.Errorf("failed to encrypt failure reason %w", err)
×
821
        }
×
822

823
        return f.resolve(ctx, &lnwire.UpdateFailHTLC{
3✔
824
                Reason: reason,
3✔
825
        })
3✔
826
}
827

828
// Settle forwards a settled packet to the switch.
829
func (f *interceptedForward) Settle(ctx context.Context,
830
        preimage lntypes.Preimage) error {
3✔
831

3✔
832
        if !preimage.Matches(f.htlc.PaymentHash) {
3✔
833
                return errors.New("preimage does not match hash")
×
834
        }
×
835
        return f.resolve(ctx, &lnwire.UpdateFulfillHTLC{
3✔
836
                PaymentPreimage: preimage,
3✔
837
        })
3✔
838
}
839

840
// resolve is used for both Settle and Fail and forwards the message to the
841
// switch.
842
func (f *interceptedForward) resolve(ctx context.Context,
843
        message lnwire.Message) error {
3✔
844

3✔
845
        pkt := &htlcPacket{
3✔
846
                incomingChanID: f.packet.incomingChanID,
3✔
847
                incomingHTLCID: f.packet.incomingHTLCID,
3✔
848
                outgoingChanID: f.packet.outgoingChanID,
3✔
849
                outgoingHTLCID: f.packet.outgoingHTLCID,
3✔
850
                isResolution:   true,
3✔
851
                circuit:        f.packet.circuit,
3✔
852
                htlc:           message,
3✔
853
                obfuscator:     f.packet.obfuscator,
3✔
854
                sourceRef:      f.packet.sourceRef,
3✔
855
        }
3✔
856
        return f.htlcSwitch.mailOrchestrator.Deliver(
3✔
857
                ctx, pkt.incomingChanID, pkt,
3✔
858
        )
3✔
859
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc