• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14513232475

17 Apr 2025 10:06AM UTC coverage: 58.637%. First build
14513232475

Pull #9691

github

web-flow
Merge c31f2c9aa into 695cf7c4a
Pull Request #9691: htlcswitch+peer [1/2]: thread context through in preparation for passing to graph DB calls

114 of 154 new or added lines in 10 files covered. (74.03%)

97229 of 165814 relevant lines covered (58.64%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.29
/htlcswitch/interceptable_switch.go
1
package htlcswitch
2

3
import (
4
        "context"
5
        "crypto/sha256"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/go-errors/errors"
11
        "github.com/lightningnetwork/lnd/chainntnfs"
12
        "github.com/lightningnetwork/lnd/fn/v2"
13
        "github.com/lightningnetwork/lnd/graph/db/models"
14
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
15
        "github.com/lightningnetwork/lnd/lntypes"
16
        "github.com/lightningnetwork/lnd/lnutils"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
)
19

20
var (
21
        // ErrFwdNotExists is an error returned when the caller tries to resolve
22
        // a forward that doesn't exist anymore.
23
        ErrFwdNotExists = errors.New("forward does not exist")
24

25
        // ErrUnsupportedFailureCode when processing of an unsupported failure
26
        // code is attempted.
27
        ErrUnsupportedFailureCode = errors.New("unsupported failure code")
28

29
        errBlockStreamStopped = errors.New("block epoch stream stopped")
30
)
31

32
// InterceptableSwitch is an implementation of ForwardingSwitch interface.
33
// This implementation is used like a proxy that wraps the switch and
34
// intercepts forward requests. A reference to the Switch is held in order
35
// to communicate back the interception result where the options are:
36
// Resume - forwards the original request to the switch as is.
37
// Settle - routes UpdateFulfillHTLC to the originating link.
38
// Fail - routes UpdateFailHTLC to the originating link.
39
type InterceptableSwitch struct {
40
        started atomic.Bool
41
        stopped atomic.Bool
42

43
        // htlcSwitch is the underline switch
44
        htlcSwitch *Switch
45

46
        // intercepted is where we stream all intercepted packets coming from
47
        // the switch.
48
        intercepted chan *interceptedPackets
49

50
        // resolutionChan is where we stream all responses coming from the
51
        // interceptor client.
52
        resolutionChan chan *fwdResolution
53

54
        onchainIntercepted chan InterceptedForward
55

56
        // interceptorRegistration is a channel that we use to synchronize
57
        // client connect and disconnect.
58
        interceptorRegistration chan ForwardInterceptor
59

60
        // requireInterceptor indicates whether processing should block if no
61
        // interceptor is connected.
62
        requireInterceptor bool
63

64
        // interceptor is the handler for intercepted packets.
65
        interceptor ForwardInterceptor
66

67
        // heldHtlcSet keeps track of outstanding intercepted forwards.
68
        heldHtlcSet *heldHtlcSet
69

70
        // cltvRejectDelta defines the number of blocks before the expiry of the
71
        // htlc where we no longer intercept it and instead cancel it back.
72
        cltvRejectDelta uint32
73

74
        // cltvInterceptDelta defines the number of blocks before the expiry of
75
        // the htlc where we don't intercept anymore. This value must be greater
76
        // than CltvRejectDelta, because we don't want to offer htlcs to the
77
        // interceptor client for which there is no time left to resolve them
78
        // anymore.
79
        cltvInterceptDelta uint32
80

81
        // notifier is an instance of a chain notifier that we'll use to signal
82
        // the switch when a new block has arrived.
83
        notifier chainntnfs.ChainNotifier
84

85
        // blockEpochStream is an active block epoch event stream backed by an
86
        // active ChainNotifier instance. This will be used to retrieve the
87
        // latest height of the chain.
88
        blockEpochStream *chainntnfs.BlockEpochEvent
89

90
        // currentHeight is the currently best known height.
91
        currentHeight int32
92

93
        wg     sync.WaitGroup
94
        quit   chan struct{}
95
        cancel fn.Option[context.CancelFunc]
96
}
97

98
type interceptedPackets struct {
99
        packets  []*htlcPacket
100
        linkQuit <-chan struct{}
101
        isReplay bool
102
}
103

104
// FwdAction defines the various resolution types.
105
type FwdAction int
106

107
const (
108
        // FwdActionResume forwards the intercepted packet to the switch.
109
        FwdActionResume FwdAction = iota
110

111
        // FwdActionSettle settles the intercepted packet with a preimage.
112
        FwdActionSettle
113

114
        // FwdActionFail fails the intercepted packet back to the sender.
115
        FwdActionFail
116

117
        // FwdActionResumeModified forwards the intercepted packet to the switch
118
        // with modifications.
119
        FwdActionResumeModified
120
)
121

122
// FwdResolution defines the action to be taken on an intercepted packet.
123
type FwdResolution struct {
124
        // Key is the incoming circuit key of the htlc.
125
        Key models.CircuitKey
126

127
        // Action is the action to take on the intercepted htlc.
128
        Action FwdAction
129

130
        // Preimage is the preimage that is to be used for settling if Action is
131
        // FwdActionSettle.
132
        Preimage lntypes.Preimage
133

134
        // InAmountMsat is the amount that is to be used for validating if
135
        // Action is FwdActionResumeModified.
136
        InAmountMsat fn.Option[lnwire.MilliSatoshi]
137

138
        // OutAmountMsat is the amount that is to be used for forwarding if
139
        // Action is FwdActionResumeModified.
140
        OutAmountMsat fn.Option[lnwire.MilliSatoshi]
141

142
        // OutWireCustomRecords is the custom records that are to be used for
143
        // forwarding if Action is FwdActionResumeModified.
144
        OutWireCustomRecords fn.Option[lnwire.CustomRecords]
145

146
        // FailureMessage is the encrypted failure message that is to be passed
147
        // back to the sender if action is FwdActionFail.
148
        FailureMessage []byte
149

150
        // FailureCode is the failure code that is to be passed back to the
151
        // sender if action is FwdActionFail.
152
        FailureCode lnwire.FailCode
153
}
154

155
type fwdResolution struct {
156
        resolution *FwdResolution
157
        errChan    chan error
158
}
159

160
// InterceptableSwitchConfig contains the configuration of InterceptableSwitch.
161
type InterceptableSwitchConfig struct {
162
        // Switch is a reference to the actual switch implementation that
163
        // packets get sent to on resume.
164
        Switch *Switch
165

166
        // Notifier is an instance of a chain notifier that we'll use to signal
167
        // the switch when a new block has arrived.
168
        Notifier chainntnfs.ChainNotifier
169

170
        // CltvRejectDelta defines the number of blocks before the expiry of the
171
        // htlc where we auto-fail an intercepted htlc to prevent channel
172
        // force-closure.
173
        CltvRejectDelta uint32
174

175
        // CltvInterceptDelta defines the number of blocks before the expiry of
176
        // the htlc where we don't intercept anymore. This value must be greater
177
        // than CltvRejectDelta, because we don't want to offer htlcs to the
178
        // interceptor client for which there is no time left to resolve them
179
        // anymore.
180
        CltvInterceptDelta uint32
181

182
        // RequireInterceptor indicates whether processing should block if no
183
        // interceptor is connected.
184
        RequireInterceptor bool
185
}
186

187
// NewInterceptableSwitch returns an instance of InterceptableSwitch.
188
func NewInterceptableSwitch(cfg *InterceptableSwitchConfig) (
189
        *InterceptableSwitch, error) {
3✔
190

3✔
191
        if cfg.CltvInterceptDelta <= cfg.CltvRejectDelta {
3✔
192
                return nil, fmt.Errorf("cltv intercept delta %v not greater "+
×
193
                        "than cltv reject delta %v",
×
194
                        cfg.CltvInterceptDelta, cfg.CltvRejectDelta)
×
195
        }
×
196

197
        return &InterceptableSwitch{
3✔
198
                htlcSwitch:              cfg.Switch,
3✔
199
                intercepted:             make(chan *interceptedPackets),
3✔
200
                onchainIntercepted:      make(chan InterceptedForward),
3✔
201
                interceptorRegistration: make(chan ForwardInterceptor),
3✔
202
                heldHtlcSet:             newHeldHtlcSet(),
3✔
203
                resolutionChan:          make(chan *fwdResolution),
3✔
204
                requireInterceptor:      cfg.RequireInterceptor,
3✔
205
                cltvRejectDelta:         cfg.CltvRejectDelta,
3✔
206
                cltvInterceptDelta:      cfg.CltvInterceptDelta,
3✔
207
                notifier:                cfg.Notifier,
3✔
208

3✔
209
                quit: make(chan struct{}),
3✔
210
        }, nil
3✔
211
}
212

213
// SetInterceptor sets the ForwardInterceptor to be used. A nil argument
214
// unregisters the current interceptor.
215
func (s *InterceptableSwitch) SetInterceptor(
216
        interceptor ForwardInterceptor) {
3✔
217

3✔
218
        // Synchronize setting the handler with the main loop to prevent race
3✔
219
        // conditions.
3✔
220
        select {
3✔
221
        case s.interceptorRegistration <- interceptor:
3✔
222

223
        case <-s.quit:
×
224
        }
225
}
226

227
func (s *InterceptableSwitch) Start() error {
3✔
228
        log.Info("InterceptableSwitch starting...")
3✔
229

3✔
230
        if s.started.Swap(true) {
3✔
231
                return fmt.Errorf("InterceptableSwitch started more than once")
×
232
        }
×
233

234
        ctx, cancel := context.WithCancel(context.Background())
3✔
235
        s.cancel = fn.Some(cancel)
3✔
236

3✔
237
        blockEpochStream, err := s.notifier.RegisterBlockEpochNtfn(nil)
3✔
238
        if err != nil {
3✔
239
                return err
×
240
        }
×
241
        s.blockEpochStream = blockEpochStream
3✔
242

3✔
243
        s.wg.Add(1)
3✔
244
        go func() {
6✔
245
                defer s.wg.Done()
3✔
246

3✔
247
                err := s.run(ctx)
3✔
248
                if err != nil {
3✔
249
                        log.Errorf("InterceptableSwitch stopped: %v", err)
×
250
                }
×
251
        }()
252

253
        log.Debug("InterceptableSwitch started")
3✔
254

3✔
255
        return nil
3✔
256
}
257

258
func (s *InterceptableSwitch) Stop() error {
3✔
259
        log.Info("InterceptableSwitch shutting down...")
3✔
260

3✔
261
        if s.stopped.Swap(true) {
3✔
262
                return fmt.Errorf("InterceptableSwitch stopped more than once")
×
263
        }
×
264

265
        s.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
6✔
266
        close(s.quit)
3✔
267
        s.wg.Wait()
3✔
268

3✔
269
        // We need to check whether the start routine run and initialized the
3✔
270
        // `blockEpochStream`.
3✔
271
        if s.blockEpochStream != nil {
6✔
272
                s.blockEpochStream.Cancel()
3✔
273
        }
3✔
274

275
        log.Debug("InterceptableSwitch shutdown complete")
3✔
276

3✔
277
        return nil
3✔
278
}
279

280
func (s *InterceptableSwitch) run(ctx context.Context) error {
3✔
281
        // The block epoch stream will immediately stream the current height.
3✔
282
        // Read it out here.
3✔
283
        select {
3✔
284
        case currentBlock, ok := <-s.blockEpochStream.Epochs:
3✔
285
                if !ok {
3✔
286
                        return errBlockStreamStopped
×
287
                }
×
288
                s.currentHeight = currentBlock.Height
3✔
289

290
        case <-s.quit:
×
291
                return nil
×
292

NEW
293
        case <-ctx.Done():
×
NEW
294
                return nil
×
295
        }
296

297
        log.Debugf("InterceptableSwitch running: height=%v, "+
3✔
298
                "requireInterceptor=%v", s.currentHeight, s.requireInterceptor)
3✔
299

3✔
300
        for {
6✔
301
                select {
3✔
302
                // An interceptor registration or de-registration came in.
303
                case interceptor := <-s.interceptorRegistration:
3✔
304
                        s.setInterceptor(interceptor)
3✔
305

306
                case packets := <-s.intercepted:
3✔
307
                        var notIntercepted []*htlcPacket
3✔
308
                        for _, p := range packets.packets {
6✔
309
                                intercepted, err := s.interceptForward(
3✔
310
                                        ctx, p, packets.isReplay,
3✔
311
                                )
3✔
312
                                if err != nil {
3✔
313
                                        return err
×
314
                                }
×
315

316
                                if !intercepted {
6✔
317
                                        notIntercepted = append(
3✔
318
                                                notIntercepted, p,
3✔
319
                                        )
3✔
320
                                }
3✔
321
                        }
322
                        err := s.htlcSwitch.ForwardPackets(
3✔
323
                                ctx, packets.linkQuit, notIntercepted...,
3✔
324
                        )
3✔
325
                        if err != nil {
3✔
326
                                log.Errorf("Cannot forward packets: %v", err)
×
327
                        }
×
328

329
                case fwd := <-s.onchainIntercepted:
3✔
330
                        // For on-chain interceptions, we don't know if it has
3✔
331
                        // already been offered before. This information is in
3✔
332
                        // the forwarding package which isn't easily accessible
3✔
333
                        // from contractcourt. It is likely though that it was
3✔
334
                        // already intercepted in the off-chain flow. And even
3✔
335
                        // if not, it is safe to signal replay so that we won't
3✔
336
                        // unexpectedly skip over this htlc.
3✔
337
                        if _, err := s.forward(ctx, fwd, true); err != nil {
3✔
338
                                return err
×
339
                        }
×
340

341
                case res := <-s.resolutionChan:
3✔
342
                        res.errChan <- s.resolve(ctx, res.resolution)
3✔
343

344
                case currentBlock, ok := <-s.blockEpochStream.Epochs:
3✔
345
                        if !ok {
3✔
346
                                return errBlockStreamStopped
×
347
                        }
×
348

349
                        s.currentHeight = currentBlock.Height
3✔
350

3✔
351
                        // A new block is appended. Fail any held htlcs that
3✔
352
                        // expire at this height to prevent channel force-close.
3✔
353
                        s.failExpiredHtlcs(ctx)
3✔
354

355
                case <-s.quit:
×
356
                        return nil
×
357

358
                case <-ctx.Done():
3✔
359
                        return nil
3✔
360
                }
361
        }
362
}
363

364
func (s *InterceptableSwitch) failExpiredHtlcs(ctx context.Context) {
3✔
365
        s.heldHtlcSet.popAutoFails(
3✔
366
                uint32(s.currentHeight),
3✔
367
                func(fwd InterceptedForward) {
3✔
368
                        err := fwd.FailWithCode(
×
NEW
369
                                ctx, lnwire.CodeTemporaryChannelFailure,
×
370
                        )
×
371
                        if err != nil {
×
372
                                log.Errorf("Cannot fail packet: %v", err)
×
373
                        }
×
374
                },
375
        )
376
}
377

378
func (s *InterceptableSwitch) sendForward(fwd InterceptedForward) {
3✔
379
        err := s.interceptor(fwd.Packet())
3✔
380
        if err != nil {
3✔
381
                // Only log the error. If we couldn't send the packet, we assume
×
382
                // that the interceptor will reconnect so that we can retry.
×
383
                log.Debugf("Interceptor cannot handle forward: %v", err)
×
384
        }
×
385
}
386

387
func (s *InterceptableSwitch) setInterceptor(interceptor ForwardInterceptor) {
3✔
388
        s.interceptor = interceptor
3✔
389

3✔
390
        // Replay all currently held htlcs. When an interceptor is not required,
3✔
391
        // there may be none because they've been cleared after the previous
3✔
392
        // disconnect.
3✔
393
        if interceptor != nil {
6✔
394
                log.Debugf("Interceptor connected")
3✔
395

3✔
396
                s.heldHtlcSet.forEach(s.sendForward)
3✔
397

3✔
398
                return
3✔
399
        }
3✔
400

401
        // The interceptor disconnects. If an interceptor is required, keep the
402
        // held htlcs.
403
        if s.requireInterceptor {
6✔
404
                log.Infof("Interceptor disconnected, retaining held packets")
3✔
405

3✔
406
                return
3✔
407
        }
3✔
408

409
        // Interceptor is not required. Release held forwards.
410
        log.Infof("Interceptor disconnected, resolving held packets")
3✔
411

3✔
412
        s.heldHtlcSet.popAll(func(fwd InterceptedForward) {
6✔
413
                err := fwd.Resume()
3✔
414
                if err != nil {
3✔
415
                        log.Errorf("Failed to resume hold forward %v", err)
×
416
                }
×
417
        })
418
}
419

420
// resolve processes a HTLC given the resolution type specified by the
421
// intercepting client.
422
func (s *InterceptableSwitch) resolve(ctx context.Context,
423
        res *FwdResolution) error {
3✔
424

3✔
425
        intercepted, err := s.heldHtlcSet.pop(res.Key)
3✔
426
        if err != nil {
3✔
427
                return err
×
428
        }
×
429

430
        switch res.Action {
3✔
431
        case FwdActionResume:
3✔
432
                return intercepted.Resume()
3✔
433

434
        case FwdActionResumeModified:
3✔
435
                return intercepted.ResumeModified(
3✔
436
                        res.InAmountMsat, res.OutAmountMsat,
3✔
437
                        res.OutWireCustomRecords,
3✔
438
                )
3✔
439

440
        case FwdActionSettle:
3✔
441
                return intercepted.Settle(res.Preimage)
3✔
442

443
        case FwdActionFail:
3✔
444
                if len(res.FailureMessage) > 0 {
3✔
445
                        return intercepted.Fail(res.FailureMessage)
×
446
                }
×
447

448
                return intercepted.FailWithCode(ctx, res.FailureCode)
3✔
449

450
        default:
×
451
                return fmt.Errorf("unrecognized action %v", res.Action)
×
452
        }
453
}
454

455
// Resolve resolves an intercepted packet.
456
func (s *InterceptableSwitch) Resolve(res *FwdResolution) error {
3✔
457
        internalRes := &fwdResolution{
3✔
458
                resolution: res,
3✔
459
                errChan:    make(chan error, 1),
3✔
460
        }
3✔
461

3✔
462
        select {
3✔
463
        case s.resolutionChan <- internalRes:
3✔
464

465
        case <-s.quit:
×
466
                return errors.New("switch shutting down")
×
467
        }
468

469
        select {
3✔
470
        case err := <-internalRes.errChan:
3✔
471
                return err
3✔
472

473
        case <-s.quit:
×
474
                return errors.New("switch shutting down")
×
475
        }
476
}
477

478
// ForwardPackets attempts to forward the batch of htlcs to a connected
479
// interceptor. If the interceptor signals the resume action, the htlcs are
480
// forwarded to the switch. The link's quit signal should be provided to allow
481
// cancellation of forwarding during link shutdown.
482
func (s *InterceptableSwitch) ForwardPackets(linkQuit <-chan struct{},
483
        isReplay bool, packets ...*htlcPacket) error {
3✔
484

3✔
485
        // Synchronize with the main event loop. This should be light in the
3✔
486
        // case where there is no interceptor.
3✔
487
        select {
3✔
488
        case s.intercepted <- &interceptedPackets{
489
                packets:  packets,
490
                linkQuit: linkQuit,
491
                isReplay: isReplay,
492
        }:
3✔
493

494
        case <-linkQuit:
1✔
495
                log.Debugf("Forward cancelled because link quit")
1✔
496

497
        case <-s.quit:
×
498
                return errors.New("interceptable switch quit")
×
499
        }
500

501
        return nil
3✔
502
}
503

504
// ForwardPacket forwards a single htlc to the external interceptor.
505
func (s *InterceptableSwitch) ForwardPacket(
506
        fwd InterceptedForward) error {
3✔
507

3✔
508
        select {
3✔
509
        case s.onchainIntercepted <- fwd:
3✔
510

511
        case <-s.quit:
×
512
                return errors.New("interceptable switch quit")
×
513
        }
514

515
        return nil
3✔
516
}
517

518
// interceptForward forwards the packet to the external interceptor after
519
// checking the interception criteria.
520
func (s *InterceptableSwitch) interceptForward(ctx context.Context,
521
        packet *htlcPacket, isReplay bool) (bool, error) {
3✔
522

3✔
523
        switch htlc := packet.htlc.(type) {
3✔
524
        case *lnwire.UpdateAddHTLC:
3✔
525
                // We are not interested in intercepting initiated payments.
3✔
526
                if packet.incomingChanID == hop.Source {
3✔
527
                        return false, nil
×
528
                }
×
529

530
                intercepted := &interceptedForward{
3✔
531
                        htlc:       htlc,
3✔
532
                        packet:     packet,
3✔
533
                        htlcSwitch: s.htlcSwitch,
3✔
534
                        autoFailHeight: int32(packet.incomingTimeout -
3✔
535
                                s.cltvRejectDelta),
3✔
536
                }
3✔
537

3✔
538
                // Handle forwards that are too close to expiry.
3✔
539
                handled, err := s.handleExpired(ctx, intercepted)
3✔
540
                if err != nil {
3✔
541
                        log.Errorf("Error handling intercepted htlc "+
×
542
                                "that expires too soon: circuit=%v, "+
×
543
                                "incoming_timeout=%v, err=%v",
×
544
                                packet.inKey(), packet.incomingTimeout, err)
×
545

×
546
                        // Return false so that the packet is offered as normal
×
547
                        // to the switch. This isn't ideal because interception
×
548
                        // may be configured as always-on and is skipped now.
×
549
                        // Returning true isn't great either, because the htlc
×
550
                        // will remain stuck and potentially force-close the
×
551
                        // channel. But in the end, we should never get here, so
×
552
                        // the actual return value doesn't matter that much.
×
553
                        return false, nil
×
554
                }
×
555
                if handled {
3✔
556
                        return true, nil
×
557
                }
×
558

559
                return s.forward(ctx, intercepted, isReplay)
3✔
560

561
        default:
3✔
562
                return false, nil
3✔
563
        }
564
}
565

566
// forward records the intercepted htlc and forwards it to the interceptor.
567
func (s *InterceptableSwitch) forward(ctx context.Context,
568
        fwd InterceptedForward, isReplay bool) (bool, error) {
3✔
569

3✔
570
        inKey := fwd.Packet().IncomingCircuit
3✔
571

3✔
572
        // Ignore already held htlcs.
3✔
573
        if s.heldHtlcSet.exists(inKey) {
6✔
574
                return true, nil
3✔
575
        }
3✔
576

577
        // If there is no interceptor currently registered, configuration and packet
578
        // replay status determine how the packet is handled.
579
        if s.interceptor == nil {
6✔
580
                // Process normally if an interceptor is not required.
3✔
581
                if !s.requireInterceptor {
6✔
582
                        return false, nil
3✔
583
                }
3✔
584

585
                // We are in interceptor-required mode. If this is a new packet, it is
586
                // still safe to fail back. The interceptor has never seen this packet
587
                // yet. This limits the backlog of htlcs when the interceptor is down.
588
                if !isReplay {
3✔
589
                        err := fwd.FailWithCode(
×
NEW
590
                                ctx, lnwire.CodeTemporaryChannelFailure,
×
591
                        )
×
592
                        if err != nil {
×
593
                                log.Errorf("Cannot fail packet: %v", err)
×
594
                        }
×
595

596
                        return true, nil
×
597
                }
598

599
                // This packet is a replay. It is not safe to fail back, because the
600
                // interceptor may still signal otherwise upon reconnect. Keep the
601
                // packet in the queue until then.
602
                if err := s.heldHtlcSet.push(inKey, fwd); err != nil {
3✔
603
                        return false, err
×
604
                }
×
605

606
                return true, nil
3✔
607
        }
608

609
        // There is an interceptor registered. We can forward the packet right now.
610
        // Hold it in the queue too to track what is outstanding.
611
        if err := s.heldHtlcSet.push(inKey, fwd); err != nil {
3✔
612
                return false, err
×
613
        }
×
614

615
        s.sendForward(fwd)
3✔
616

3✔
617
        return true, nil
3✔
618
}
619

620
// handleExpired checks that the htlc isn't too close to the channel
621
// force-close broadcast height. If it is, it is cancelled back.
622
func (s *InterceptableSwitch) handleExpired(ctx context.Context,
623
        fwd *interceptedForward) (bool, error) {
3✔
624

3✔
625
        height := uint32(s.currentHeight)
3✔
626
        if fwd.packet.incomingTimeout >= height+s.cltvInterceptDelta {
6✔
627
                return false, nil
3✔
628
        }
3✔
629

630
        log.Debugf("Interception rejected because htlc "+
×
631
                "expires too soon: circuit=%v, "+
×
632
                "height=%v, incoming_timeout=%v",
×
633
                fwd.packet.inKey(), height,
×
634
                fwd.packet.incomingTimeout)
×
635

×
636
        err := fwd.FailWithCode(
×
NEW
637
                ctx, lnwire.CodeExpiryTooSoon,
×
638
        )
×
639
        if err != nil {
×
640
                return false, err
×
641
        }
×
642

643
        return true, nil
×
644
}
645

646
// interceptedForward implements the InterceptedForward interface.
647
// It is passed from the switch to external interceptors that are interested
648
// in holding forwards and resolve them manually.
649
type interceptedForward struct {
650
        htlc           *lnwire.UpdateAddHTLC
651
        packet         *htlcPacket
652
        htlcSwitch     *Switch
653
        autoFailHeight int32
654
}
655

656
// Packet returns the intercepted htlc packet.
657
func (f *interceptedForward) Packet() InterceptedPacket {
3✔
658
        return InterceptedPacket{
3✔
659
                IncomingCircuit: models.CircuitKey{
3✔
660
                        ChanID: f.packet.incomingChanID,
3✔
661
                        HtlcID: f.packet.incomingHTLCID,
3✔
662
                },
3✔
663
                OutgoingChanID:       f.packet.outgoingChanID,
3✔
664
                Hash:                 f.htlc.PaymentHash,
3✔
665
                OutgoingExpiry:       f.htlc.Expiry,
3✔
666
                OutgoingAmount:       f.htlc.Amount,
3✔
667
                IncomingAmount:       f.packet.incomingAmount,
3✔
668
                IncomingExpiry:       f.packet.incomingTimeout,
3✔
669
                InOnionCustomRecords: f.packet.inOnionCustomRecords,
3✔
670
                OnionBlob:            f.htlc.OnionBlob,
3✔
671
                AutoFailHeight:       f.autoFailHeight,
3✔
672
                InWireCustomRecords:  f.packet.inWireCustomRecords,
3✔
673
        }
3✔
674
}
3✔
675

676
// Resume resumes the default behavior as if the packet was not intercepted.
677
func (f *interceptedForward) Resume() error {
3✔
678
        ctx := context.TODO()
3✔
679
        // Forward to the switch. A link quit channel isn't needed, because we
3✔
680
        // are on a different thread now.
3✔
681
        return f.htlcSwitch.ForwardPackets(ctx, nil, f.packet)
3✔
682
}
3✔
683

684
// ResumeModified resumes the default behavior with field modifications. The
685
// input amount (if provided) specifies that the value of the inbound HTLC
686
// should be interpreted differently from the on-chain amount during further
687
// validation. The presence of an output amount and/or custom records indicates
688
// that those values should be modified on the outgoing HTLC.
689
func (f *interceptedForward) ResumeModified(
690
        inAmountMsat fn.Option[lnwire.MilliSatoshi],
691
        outAmountMsat fn.Option[lnwire.MilliSatoshi],
692
        outWireCustomRecords fn.Option[lnwire.CustomRecords]) error {
3✔
693

3✔
694
        ctx := context.TODO()
3✔
695

3✔
696
        // Convert the optional custom records to the correct type and validate
3✔
697
        // them.
3✔
698
        validatedRecords, err := fn.MapOptionZ(
3✔
699
                outWireCustomRecords,
3✔
700
                func(cr lnwire.CustomRecords) fn.Result[lnwire.CustomRecords] {
6✔
701
                        if len(cr) == 0 {
3✔
702
                                return fn.Ok[lnwire.CustomRecords](nil)
×
703
                        }
×
704

705
                        // Type cast and validate custom records.
706
                        err := cr.Validate()
3✔
707
                        if err != nil {
3✔
708
                                return fn.Err[lnwire.CustomRecords](
×
709
                                        fmt.Errorf("failed to validate "+
×
710
                                                "custom records: %w", err),
×
711
                                )
×
712
                        }
×
713

714
                        return fn.Ok(cr)
3✔
715
                },
716
        ).Unpack()
717
        if err != nil {
3✔
718
                return fmt.Errorf("failed to encode custom records: %w",
×
719
                        err)
×
720
        }
×
721

722
        // Set the incoming amount, if it is provided, on the packet.
723
        inAmountMsat.WhenSome(func(amount lnwire.MilliSatoshi) {
3✔
724
                f.packet.incomingAmount = amount
×
725
        })
×
726

727
        // Modify the wire message contained in the packet.
728
        switch htlc := f.packet.htlc.(type) {
3✔
729
        case *lnwire.UpdateAddHTLC:
3✔
730
                outAmountMsat.WhenSome(func(amount lnwire.MilliSatoshi) {
6✔
731
                        f.packet.amount = amount
3✔
732
                        htlc.Amount = amount
3✔
733
                })
3✔
734

735
                // Merge custom records with any validated records that were
736
                // added in the modify request, overwriting any existing values
737
                // with those supplied in the modifier API.
738
                htlc.CustomRecords = htlc.CustomRecords.MergedCopy(
3✔
739
                        validatedRecords,
3✔
740
                )
3✔
741

742
        case *lnwire.UpdateFulfillHTLC:
×
743
                if len(validatedRecords) > 0 {
×
744
                        htlc.CustomRecords = validatedRecords
×
745
                }
×
746
        }
747

748
        log.Tracef("Forwarding packet %v", lnutils.SpewLogClosure(f.packet))
3✔
749

3✔
750
        // Forward to the switch. A link quit channel isn't needed, because we
3✔
751
        // are on a different thread now.
3✔
752
        return f.htlcSwitch.ForwardPackets(ctx, nil, f.packet)
3✔
753
}
754

755
// Fail notifies the intention to Fail an existing hold forward with an
756
// encrypted failure reason.
757
func (f *interceptedForward) Fail(reason []byte) error {
×
758
        obfuscatedReason := f.packet.obfuscator.IntermediateEncrypt(reason)
×
759

×
760
        return f.resolve(&lnwire.UpdateFailHTLC{
×
761
                Reason: obfuscatedReason,
×
762
        })
×
763
}
×
764

765
// FailWithCode notifies the intention to fail an existing hold forward with the
766
// specified failure code.
767
func (f *interceptedForward) FailWithCode(ctx context.Context,
768
        code lnwire.FailCode) error {
3✔
769

3✔
770
        shaOnionBlob := func() [32]byte {
3✔
771
                return sha256.Sum256(f.htlc.OnionBlob[:])
×
772
        }
×
773

774
        // Create a local failure.
775
        var failureMsg lnwire.FailureMessage
3✔
776

3✔
777
        switch code {
3✔
778
        case lnwire.CodeInvalidOnionVersion:
×
779
                failureMsg = &lnwire.FailInvalidOnionVersion{
×
780
                        OnionSHA256: shaOnionBlob(),
×
781
                }
×
782

783
        case lnwire.CodeInvalidOnionHmac:
×
784
                failureMsg = &lnwire.FailInvalidOnionHmac{
×
785
                        OnionSHA256: shaOnionBlob(),
×
786
                }
×
787

788
        case lnwire.CodeInvalidOnionKey:
×
789
                failureMsg = &lnwire.FailInvalidOnionKey{
×
790
                        OnionSHA256: shaOnionBlob(),
×
791
                }
×
792

793
        case lnwire.CodeTemporaryChannelFailure:
3✔
794
                update := f.htlcSwitch.failAliasUpdate(
3✔
795
                        ctx, f.packet.incomingChanID, true,
3✔
796
                )
3✔
797
                if update == nil {
6✔
798
                        // Fallback to the original, non-alias behavior.
3✔
799
                        var err error
3✔
800
                        update, err = f.htlcSwitch.cfg.FetchLastChannelUpdate(
3✔
801
                                ctx, f.packet.incomingChanID,
3✔
802
                        )
3✔
803
                        if err != nil {
3✔
804
                                return err
×
805
                        }
×
806
                }
807

808
                failureMsg = lnwire.NewTemporaryChannelFailure(update)
3✔
809

810
        case lnwire.CodeExpiryTooSoon:
×
811
                update, err := f.htlcSwitch.cfg.FetchLastChannelUpdate(
×
NEW
812
                        ctx, f.packet.incomingChanID,
×
813
                )
×
814
                if err != nil {
×
815
                        return err
×
816
                }
×
817

818
                failureMsg = lnwire.NewExpiryTooSoon(*update)
×
819

820
        default:
×
821
                return ErrUnsupportedFailureCode
×
822
        }
823

824
        // Encrypt the failure for the first hop. This node will be the origin
825
        // of the failure.
826
        reason, err := f.packet.obfuscator.EncryptFirstHop(failureMsg)
3✔
827
        if err != nil {
3✔
828
                return fmt.Errorf("failed to encrypt failure reason %w", err)
×
829
        }
×
830

831
        return f.resolve(&lnwire.UpdateFailHTLC{
3✔
832
                Reason: reason,
3✔
833
        })
3✔
834
}
835

836
// Settle forwards a settled packet to the switch.
837
func (f *interceptedForward) Settle(preimage lntypes.Preimage) error {
3✔
838
        if !preimage.Matches(f.htlc.PaymentHash) {
3✔
839
                return errors.New("preimage does not match hash")
×
840
        }
×
841
        return f.resolve(&lnwire.UpdateFulfillHTLC{
3✔
842
                PaymentPreimage: preimage,
3✔
843
        })
3✔
844
}
845

846
// resolve is used for both Settle and Fail and forwards the message to the
847
// switch.
848
func (f *interceptedForward) resolve(message lnwire.Message) error {
3✔
849
        pkt := &htlcPacket{
3✔
850
                incomingChanID: f.packet.incomingChanID,
3✔
851
                incomingHTLCID: f.packet.incomingHTLCID,
3✔
852
                outgoingChanID: f.packet.outgoingChanID,
3✔
853
                outgoingHTLCID: f.packet.outgoingHTLCID,
3✔
854
                isResolution:   true,
3✔
855
                circuit:        f.packet.circuit,
3✔
856
                htlc:           message,
3✔
857
                obfuscator:     f.packet.obfuscator,
3✔
858
                sourceRef:      f.packet.sourceRef,
3✔
859
        }
3✔
860
        return f.htlcSwitch.mailOrchestrator.Deliver(pkt.incomingChanID, pkt)
3✔
861
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc