• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12293074449

12 Dec 2024 09:03AM UTC coverage: 49.538% (-0.002%) from 49.54%
12293074449

push

github

web-flow
Merge pull request #9341 from ellemouton/fnContext

fn: Remove ctx from GoroutineManager constructor

100372 of 202617 relevant lines covered (49.54%)

2.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.26
/htlcswitch/interceptable_switch.go
1
package htlcswitch
2

3
import (
4
        "crypto/sha256"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/go-errors/errors"
10
        "github.com/lightningnetwork/lnd/chainntnfs"
11
        "github.com/lightningnetwork/lnd/fn/v2"
12
        "github.com/lightningnetwork/lnd/graph/db/models"
13
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
14
        "github.com/lightningnetwork/lnd/lntypes"
15
        "github.com/lightningnetwork/lnd/lnutils"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
var (
20
        // ErrFwdNotExists is an error returned when the caller tries to resolve
21
        // a forward that doesn't exist anymore.
22
        ErrFwdNotExists = errors.New("forward does not exist")
23

24
        // ErrUnsupportedFailureCode when processing of an unsupported failure
25
        // code is attempted.
26
        ErrUnsupportedFailureCode = errors.New("unsupported failure code")
27

28
        errBlockStreamStopped = errors.New("block epoch stream stopped")
29
)
30

31
// InterceptableSwitch is an implementation of ForwardingSwitch interface.
32
// This implementation is used like a proxy that wraps the switch and
33
// intercepts forward requests. A reference to the Switch is held in order
34
// to communicate back the interception result where the options are:
35
// Resume - forwards the original request to the switch as is.
36
// Settle - routes UpdateFulfillHTLC to the originating link.
37
// Fail - routes UpdateFailHTLC to the originating link.
38
type InterceptableSwitch struct {
39
        started atomic.Bool
40
        stopped atomic.Bool
41

42
        // htlcSwitch is the underline switch
43
        htlcSwitch *Switch
44

45
        // intercepted is where we stream all intercepted packets coming from
46
        // the switch.
47
        intercepted chan *interceptedPackets
48

49
        // resolutionChan is where we stream all responses coming from the
50
        // interceptor client.
51
        resolutionChan chan *fwdResolution
52

53
        onchainIntercepted chan InterceptedForward
54

55
        // interceptorRegistration is a channel that we use to synchronize
56
        // client connect and disconnect.
57
        interceptorRegistration chan ForwardInterceptor
58

59
        // requireInterceptor indicates whether processing should block if no
60
        // interceptor is connected.
61
        requireInterceptor bool
62

63
        // interceptor is the handler for intercepted packets.
64
        interceptor ForwardInterceptor
65

66
        // heldHtlcSet keeps track of outstanding intercepted forwards.
67
        heldHtlcSet *heldHtlcSet
68

69
        // cltvRejectDelta defines the number of blocks before the expiry of the
70
        // htlc where we no longer intercept it and instead cancel it back.
71
        cltvRejectDelta uint32
72

73
        // cltvInterceptDelta defines the number of blocks before the expiry of
74
        // the htlc where we don't intercept anymore. This value must be greater
75
        // than CltvRejectDelta, because we don't want to offer htlcs to the
76
        // interceptor client for which there is no time left to resolve them
77
        // anymore.
78
        cltvInterceptDelta uint32
79

80
        // notifier is an instance of a chain notifier that we'll use to signal
81
        // the switch when a new block has arrived.
82
        notifier chainntnfs.ChainNotifier
83

84
        // blockEpochStream is an active block epoch event stream backed by an
85
        // active ChainNotifier instance. This will be used to retrieve the
86
        // latest height of the chain.
87
        blockEpochStream *chainntnfs.BlockEpochEvent
88

89
        // currentHeight is the currently best known height.
90
        currentHeight int32
91

92
        wg   sync.WaitGroup
93
        quit chan struct{}
94
}
95

96
type interceptedPackets struct {
97
        packets  []*htlcPacket
98
        linkQuit <-chan struct{}
99
        isReplay bool
100
}
101

102
// FwdAction defines the various resolution types.
103
type FwdAction int
104

105
const (
106
        // FwdActionResume forwards the intercepted packet to the switch.
107
        FwdActionResume FwdAction = iota
108

109
        // FwdActionSettle settles the intercepted packet with a preimage.
110
        FwdActionSettle
111

112
        // FwdActionFail fails the intercepted packet back to the sender.
113
        FwdActionFail
114

115
        // FwdActionResumeModified forwards the intercepted packet to the switch
116
        // with modifications.
117
        FwdActionResumeModified
118
)
119

120
// FwdResolution defines the action to be taken on an intercepted packet.
121
type FwdResolution struct {
122
        // Key is the incoming circuit key of the htlc.
123
        Key models.CircuitKey
124

125
        // Action is the action to take on the intercepted htlc.
126
        Action FwdAction
127

128
        // Preimage is the preimage that is to be used for settling if Action is
129
        // FwdActionSettle.
130
        Preimage lntypes.Preimage
131

132
        // InAmountMsat is the amount that is to be used for validating if
133
        // Action is FwdActionResumeModified.
134
        InAmountMsat fn.Option[lnwire.MilliSatoshi]
135

136
        // OutAmountMsat is the amount that is to be used for forwarding if
137
        // Action is FwdActionResumeModified.
138
        OutAmountMsat fn.Option[lnwire.MilliSatoshi]
139

140
        // OutWireCustomRecords is the custom records that are to be used for
141
        // forwarding if Action is FwdActionResumeModified.
142
        OutWireCustomRecords fn.Option[lnwire.CustomRecords]
143

144
        // FailureMessage is the encrypted failure message that is to be passed
145
        // back to the sender if action is FwdActionFail.
146
        FailureMessage []byte
147

148
        // FailureCode is the failure code that is to be passed back to the
149
        // sender if action is FwdActionFail.
150
        FailureCode lnwire.FailCode
151
}
152

153
type fwdResolution struct {
154
        resolution *FwdResolution
155
        errChan    chan error
156
}
157

158
// InterceptableSwitchConfig contains the configuration of InterceptableSwitch.
159
type InterceptableSwitchConfig struct {
160
        // Switch is a reference to the actual switch implementation that
161
        // packets get sent to on resume.
162
        Switch *Switch
163

164
        // Notifier is an instance of a chain notifier that we'll use to signal
165
        // the switch when a new block has arrived.
166
        Notifier chainntnfs.ChainNotifier
167

168
        // CltvRejectDelta defines the number of blocks before the expiry of the
169
        // htlc where we auto-fail an intercepted htlc to prevent channel
170
        // force-closure.
171
        CltvRejectDelta uint32
172

173
        // CltvInterceptDelta defines the number of blocks before the expiry of
174
        // the htlc where we don't intercept anymore. This value must be greater
175
        // than CltvRejectDelta, because we don't want to offer htlcs to the
176
        // interceptor client for which there is no time left to resolve them
177
        // anymore.
178
        CltvInterceptDelta uint32
179

180
        // RequireInterceptor indicates whether processing should block if no
181
        // interceptor is connected.
182
        RequireInterceptor bool
183
}
184

185
// NewInterceptableSwitch returns an instance of InterceptableSwitch.
186
func NewInterceptableSwitch(cfg *InterceptableSwitchConfig) (
187
        *InterceptableSwitch, error) {
4✔
188

4✔
189
        if cfg.CltvInterceptDelta <= cfg.CltvRejectDelta {
4✔
190
                return nil, fmt.Errorf("cltv intercept delta %v not greater "+
×
191
                        "than cltv reject delta %v",
×
192
                        cfg.CltvInterceptDelta, cfg.CltvRejectDelta)
×
193
        }
×
194

195
        return &InterceptableSwitch{
4✔
196
                htlcSwitch:              cfg.Switch,
4✔
197
                intercepted:             make(chan *interceptedPackets),
4✔
198
                onchainIntercepted:      make(chan InterceptedForward),
4✔
199
                interceptorRegistration: make(chan ForwardInterceptor),
4✔
200
                heldHtlcSet:             newHeldHtlcSet(),
4✔
201
                resolutionChan:          make(chan *fwdResolution),
4✔
202
                requireInterceptor:      cfg.RequireInterceptor,
4✔
203
                cltvRejectDelta:         cfg.CltvRejectDelta,
4✔
204
                cltvInterceptDelta:      cfg.CltvInterceptDelta,
4✔
205
                notifier:                cfg.Notifier,
4✔
206

4✔
207
                quit: make(chan struct{}),
4✔
208
        }, nil
4✔
209
}
210

211
// SetInterceptor sets the ForwardInterceptor to be used. A nil argument
212
// unregisters the current interceptor.
213
func (s *InterceptableSwitch) SetInterceptor(
214
        interceptor ForwardInterceptor) {
4✔
215

4✔
216
        // Synchronize setting the handler with the main loop to prevent race
4✔
217
        // conditions.
4✔
218
        select {
4✔
219
        case s.interceptorRegistration <- interceptor:
4✔
220

221
        case <-s.quit:
×
222
        }
223
}
224

225
func (s *InterceptableSwitch) Start() error {
4✔
226
        log.Info("InterceptableSwitch starting...")
4✔
227

4✔
228
        if s.started.Swap(true) {
4✔
229
                return fmt.Errorf("InterceptableSwitch started more than once")
×
230
        }
×
231

232
        blockEpochStream, err := s.notifier.RegisterBlockEpochNtfn(nil)
4✔
233
        if err != nil {
4✔
234
                return err
×
235
        }
×
236
        s.blockEpochStream = blockEpochStream
4✔
237

4✔
238
        s.wg.Add(1)
4✔
239
        go func() {
8✔
240
                defer s.wg.Done()
4✔
241

4✔
242
                err := s.run()
4✔
243
                if err != nil {
4✔
244
                        log.Errorf("InterceptableSwitch stopped: %v", err)
×
245
                }
×
246
        }()
247

248
        log.Debug("InterceptableSwitch started")
4✔
249

4✔
250
        return nil
4✔
251
}
252

253
func (s *InterceptableSwitch) Stop() error {
4✔
254
        log.Info("InterceptableSwitch shutting down...")
4✔
255

4✔
256
        if s.stopped.Swap(true) {
4✔
257
                return fmt.Errorf("InterceptableSwitch stopped more than once")
×
258
        }
×
259

260
        close(s.quit)
4✔
261
        s.wg.Wait()
4✔
262

4✔
263
        // We need to check whether the start routine run and initialized the
4✔
264
        // `blockEpochStream`.
4✔
265
        if s.blockEpochStream != nil {
8✔
266
                s.blockEpochStream.Cancel()
4✔
267
        }
4✔
268

269
        log.Debug("InterceptableSwitch shutdown complete")
4✔
270

4✔
271
        return nil
4✔
272
}
273

274
func (s *InterceptableSwitch) run() error {
4✔
275
        // The block epoch stream will immediately stream the current height.
4✔
276
        // Read it out here.
4✔
277
        select {
4✔
278
        case currentBlock, ok := <-s.blockEpochStream.Epochs:
4✔
279
                if !ok {
4✔
280
                        return errBlockStreamStopped
×
281
                }
×
282
                s.currentHeight = currentBlock.Height
4✔
283

284
        case <-s.quit:
×
285
                return nil
×
286
        }
287

288
        log.Debugf("InterceptableSwitch running: height=%v, "+
4✔
289
                "requireInterceptor=%v", s.currentHeight, s.requireInterceptor)
4✔
290

4✔
291
        for {
8✔
292
                select {
4✔
293
                // An interceptor registration or de-registration came in.
294
                case interceptor := <-s.interceptorRegistration:
4✔
295
                        s.setInterceptor(interceptor)
4✔
296

297
                case packets := <-s.intercepted:
4✔
298
                        var notIntercepted []*htlcPacket
4✔
299
                        for _, p := range packets.packets {
8✔
300
                                intercepted, err := s.interceptForward(
4✔
301
                                        p, packets.isReplay,
4✔
302
                                )
4✔
303
                                if err != nil {
4✔
304
                                        return err
×
305
                                }
×
306

307
                                if !intercepted {
8✔
308
                                        notIntercepted = append(
4✔
309
                                                notIntercepted, p,
4✔
310
                                        )
4✔
311
                                }
4✔
312
                        }
313
                        err := s.htlcSwitch.ForwardPackets(
4✔
314
                                packets.linkQuit, notIntercepted...,
4✔
315
                        )
4✔
316
                        if err != nil {
4✔
317
                                log.Errorf("Cannot forward packets: %v", err)
×
318
                        }
×
319

320
                case fwd := <-s.onchainIntercepted:
4✔
321
                        // For on-chain interceptions, we don't know if it has
4✔
322
                        // already been offered before. This information is in
4✔
323
                        // the forwarding package which isn't easily accessible
4✔
324
                        // from contractcourt. It is likely though that it was
4✔
325
                        // already intercepted in the off-chain flow. And even
4✔
326
                        // if not, it is safe to signal replay so that we won't
4✔
327
                        // unexpectedly skip over this htlc.
4✔
328
                        if _, err := s.forward(fwd, true); err != nil {
4✔
329
                                return err
×
330
                        }
×
331

332
                case res := <-s.resolutionChan:
4✔
333
                        res.errChan <- s.resolve(res.resolution)
4✔
334

335
                case currentBlock, ok := <-s.blockEpochStream.Epochs:
4✔
336
                        if !ok {
4✔
337
                                return errBlockStreamStopped
×
338
                        }
×
339

340
                        s.currentHeight = currentBlock.Height
4✔
341

4✔
342
                        // A new block is appended. Fail any held htlcs that
4✔
343
                        // expire at this height to prevent channel force-close.
4✔
344
                        s.failExpiredHtlcs()
4✔
345

346
                case <-s.quit:
4✔
347
                        return nil
4✔
348
                }
349
        }
350
}
351

352
func (s *InterceptableSwitch) failExpiredHtlcs() {
4✔
353
        s.heldHtlcSet.popAutoFails(
4✔
354
                uint32(s.currentHeight),
4✔
355
                func(fwd InterceptedForward) {
4✔
356
                        err := fwd.FailWithCode(
×
357
                                lnwire.CodeTemporaryChannelFailure,
×
358
                        )
×
359
                        if err != nil {
×
360
                                log.Errorf("Cannot fail packet: %v", err)
×
361
                        }
×
362
                },
363
        )
364
}
365

366
func (s *InterceptableSwitch) sendForward(fwd InterceptedForward) {
4✔
367
        err := s.interceptor(fwd.Packet())
4✔
368
        if err != nil {
4✔
369
                // Only log the error. If we couldn't send the packet, we assume
×
370
                // that the interceptor will reconnect so that we can retry.
×
371
                log.Debugf("Interceptor cannot handle forward: %v", err)
×
372
        }
×
373
}
374

375
func (s *InterceptableSwitch) setInterceptor(interceptor ForwardInterceptor) {
4✔
376
        s.interceptor = interceptor
4✔
377

4✔
378
        // Replay all currently held htlcs. When an interceptor is not required,
4✔
379
        // there may be none because they've been cleared after the previous
4✔
380
        // disconnect.
4✔
381
        if interceptor != nil {
8✔
382
                log.Debugf("Interceptor connected")
4✔
383

4✔
384
                s.heldHtlcSet.forEach(s.sendForward)
4✔
385

4✔
386
                return
4✔
387
        }
4✔
388

389
        // The interceptor disconnects. If an interceptor is required, keep the
390
        // held htlcs.
391
        if s.requireInterceptor {
8✔
392
                log.Infof("Interceptor disconnected, retaining held packets")
4✔
393

4✔
394
                return
4✔
395
        }
4✔
396

397
        // Interceptor is not required. Release held forwards.
398
        log.Infof("Interceptor disconnected, resolving held packets")
4✔
399

4✔
400
        s.heldHtlcSet.popAll(func(fwd InterceptedForward) {
8✔
401
                err := fwd.Resume()
4✔
402
                if err != nil {
4✔
403
                        log.Errorf("Failed to resume hold forward %v", err)
×
404
                }
×
405
        })
406
}
407

408
// resolve processes a HTLC given the resolution type specified by the
409
// intercepting client.
410
func (s *InterceptableSwitch) resolve(res *FwdResolution) error {
4✔
411
        intercepted, err := s.heldHtlcSet.pop(res.Key)
4✔
412
        if err != nil {
8✔
413
                return err
4✔
414
        }
4✔
415

416
        switch res.Action {
4✔
417
        case FwdActionResume:
4✔
418
                return intercepted.Resume()
4✔
419

420
        case FwdActionResumeModified:
4✔
421
                return intercepted.ResumeModified(
4✔
422
                        res.InAmountMsat, res.OutAmountMsat,
4✔
423
                        res.OutWireCustomRecords,
4✔
424
                )
4✔
425

426
        case FwdActionSettle:
4✔
427
                return intercepted.Settle(res.Preimage)
4✔
428

429
        case FwdActionFail:
4✔
430
                if len(res.FailureMessage) > 0 {
4✔
431
                        return intercepted.Fail(res.FailureMessage)
×
432
                }
×
433

434
                return intercepted.FailWithCode(res.FailureCode)
4✔
435

436
        default:
×
437
                return fmt.Errorf("unrecognized action %v", res.Action)
×
438
        }
439
}
440

441
// Resolve resolves an intercepted packet.
442
func (s *InterceptableSwitch) Resolve(res *FwdResolution) error {
4✔
443
        internalRes := &fwdResolution{
4✔
444
                resolution: res,
4✔
445
                errChan:    make(chan error, 1),
4✔
446
        }
4✔
447

4✔
448
        select {
4✔
449
        case s.resolutionChan <- internalRes:
4✔
450

451
        case <-s.quit:
×
452
                return errors.New("switch shutting down")
×
453
        }
454

455
        select {
4✔
456
        case err := <-internalRes.errChan:
4✔
457
                return err
4✔
458

459
        case <-s.quit:
×
460
                return errors.New("switch shutting down")
×
461
        }
462
}
463

464
// ForwardPackets attempts to forward the batch of htlcs to a connected
465
// interceptor. If the interceptor signals the resume action, the htlcs are
466
// forwarded to the switch. The link's quit signal should be provided to allow
467
// cancellation of forwarding during link shutdown.
468
func (s *InterceptableSwitch) ForwardPackets(linkQuit <-chan struct{},
469
        isReplay bool, packets ...*htlcPacket) error {
4✔
470

4✔
471
        // Synchronize with the main event loop. This should be light in the
4✔
472
        // case where there is no interceptor.
4✔
473
        select {
4✔
474
        case s.intercepted <- &interceptedPackets{
475
                packets:  packets,
476
                linkQuit: linkQuit,
477
                isReplay: isReplay,
478
        }:
4✔
479

480
        case <-linkQuit:
×
481
                log.Debugf("Forward cancelled because link quit")
×
482

483
        case <-s.quit:
×
484
                return errors.New("interceptable switch quit")
×
485
        }
486

487
        return nil
4✔
488
}
489

490
// ForwardPacket forwards a single htlc to the external interceptor.
491
func (s *InterceptableSwitch) ForwardPacket(
492
        fwd InterceptedForward) error {
4✔
493

4✔
494
        select {
4✔
495
        case s.onchainIntercepted <- fwd:
4✔
496

497
        case <-s.quit:
×
498
                return errors.New("interceptable switch quit")
×
499
        }
500

501
        return nil
4✔
502
}
503

504
// interceptForward forwards the packet to the external interceptor after
505
// checking the interception criteria.
506
func (s *InterceptableSwitch) interceptForward(packet *htlcPacket,
507
        isReplay bool) (bool, error) {
4✔
508

4✔
509
        switch htlc := packet.htlc.(type) {
4✔
510
        case *lnwire.UpdateAddHTLC:
4✔
511
                // We are not interested in intercepting initiated payments.
4✔
512
                if packet.incomingChanID == hop.Source {
4✔
513
                        return false, nil
×
514
                }
×
515

516
                intercepted := &interceptedForward{
4✔
517
                        htlc:       htlc,
4✔
518
                        packet:     packet,
4✔
519
                        htlcSwitch: s.htlcSwitch,
4✔
520
                        autoFailHeight: int32(packet.incomingTimeout -
4✔
521
                                s.cltvRejectDelta),
4✔
522
                }
4✔
523

4✔
524
                // Handle forwards that are too close to expiry.
4✔
525
                handled, err := s.handleExpired(intercepted)
4✔
526
                if err != nil {
4✔
527
                        log.Errorf("Error handling intercepted htlc "+
×
528
                                "that expires too soon: circuit=%v, "+
×
529
                                "incoming_timeout=%v, err=%v",
×
530
                                packet.inKey(), packet.incomingTimeout, err)
×
531

×
532
                        // Return false so that the packet is offered as normal
×
533
                        // to the switch. This isn't ideal because interception
×
534
                        // may be configured as always-on and is skipped now.
×
535
                        // Returning true isn't great either, because the htlc
×
536
                        // will remain stuck and potentially force-close the
×
537
                        // channel. But in the end, we should never get here, so
×
538
                        // the actual return value doesn't matter that much.
×
539
                        return false, nil
×
540
                }
×
541
                if handled {
4✔
542
                        return true, nil
×
543
                }
×
544

545
                return s.forward(intercepted, isReplay)
4✔
546

547
        default:
4✔
548
                return false, nil
4✔
549
        }
550
}
551

552
// forward records the intercepted htlc and forwards it to the interceptor.
553
func (s *InterceptableSwitch) forward(
554
        fwd InterceptedForward, isReplay bool) (bool, error) {
4✔
555

4✔
556
        inKey := fwd.Packet().IncomingCircuit
4✔
557

4✔
558
        // Ignore already held htlcs.
4✔
559
        if s.heldHtlcSet.exists(inKey) {
8✔
560
                return true, nil
4✔
561
        }
4✔
562

563
        // If there is no interceptor currently registered, configuration and packet
564
        // replay status determine how the packet is handled.
565
        if s.interceptor == nil {
8✔
566
                // Process normally if an interceptor is not required.
4✔
567
                if !s.requireInterceptor {
8✔
568
                        return false, nil
4✔
569
                }
4✔
570

571
                // We are in interceptor-required mode. If this is a new packet, it is
572
                // still safe to fail back. The interceptor has never seen this packet
573
                // yet. This limits the backlog of htlcs when the interceptor is down.
574
                if !isReplay {
4✔
575
                        err := fwd.FailWithCode(
×
576
                                lnwire.CodeTemporaryChannelFailure,
×
577
                        )
×
578
                        if err != nil {
×
579
                                log.Errorf("Cannot fail packet: %v", err)
×
580
                        }
×
581

582
                        return true, nil
×
583
                }
584

585
                // This packet is a replay. It is not safe to fail back, because the
586
                // interceptor may still signal otherwise upon reconnect. Keep the
587
                // packet in the queue until then.
588
                if err := s.heldHtlcSet.push(inKey, fwd); err != nil {
4✔
589
                        return false, err
×
590
                }
×
591

592
                return true, nil
4✔
593
        }
594

595
        // There is an interceptor registered. We can forward the packet right now.
596
        // Hold it in the queue too to track what is outstanding.
597
        if err := s.heldHtlcSet.push(inKey, fwd); err != nil {
4✔
598
                return false, err
×
599
        }
×
600

601
        s.sendForward(fwd)
4✔
602

4✔
603
        return true, nil
4✔
604
}
605

606
// handleExpired checks that the htlc isn't too close to the channel
607
// force-close broadcast height. If it is, it is cancelled back.
608
func (s *InterceptableSwitch) handleExpired(fwd *interceptedForward) (
609
        bool, error) {
4✔
610

4✔
611
        height := uint32(s.currentHeight)
4✔
612
        if fwd.packet.incomingTimeout >= height+s.cltvInterceptDelta {
8✔
613
                return false, nil
4✔
614
        }
4✔
615

616
        log.Debugf("Interception rejected because htlc "+
×
617
                "expires too soon: circuit=%v, "+
×
618
                "height=%v, incoming_timeout=%v",
×
619
                fwd.packet.inKey(), height,
×
620
                fwd.packet.incomingTimeout)
×
621

×
622
        err := fwd.FailWithCode(
×
623
                lnwire.CodeExpiryTooSoon,
×
624
        )
×
625
        if err != nil {
×
626
                return false, err
×
627
        }
×
628

629
        return true, nil
×
630
}
631

632
// interceptedForward implements the InterceptedForward interface.
633
// It is passed from the switch to external interceptors that are interested
634
// in holding forwards and resolve them manually.
635
type interceptedForward struct {
636
        htlc           *lnwire.UpdateAddHTLC
637
        packet         *htlcPacket
638
        htlcSwitch     *Switch
639
        autoFailHeight int32
640
}
641

642
// Packet returns the intercepted htlc packet.
643
func (f *interceptedForward) Packet() InterceptedPacket {
4✔
644
        return InterceptedPacket{
4✔
645
                IncomingCircuit: models.CircuitKey{
4✔
646
                        ChanID: f.packet.incomingChanID,
4✔
647
                        HtlcID: f.packet.incomingHTLCID,
4✔
648
                },
4✔
649
                OutgoingChanID:       f.packet.outgoingChanID,
4✔
650
                Hash:                 f.htlc.PaymentHash,
4✔
651
                OutgoingExpiry:       f.htlc.Expiry,
4✔
652
                OutgoingAmount:       f.htlc.Amount,
4✔
653
                IncomingAmount:       f.packet.incomingAmount,
4✔
654
                IncomingExpiry:       f.packet.incomingTimeout,
4✔
655
                InOnionCustomRecords: f.packet.inOnionCustomRecords,
4✔
656
                OnionBlob:            f.htlc.OnionBlob,
4✔
657
                AutoFailHeight:       f.autoFailHeight,
4✔
658
                InWireCustomRecords:  f.packet.inWireCustomRecords,
4✔
659
        }
4✔
660
}
4✔
661

662
// Resume resumes the default behavior as if the packet was not intercepted.
663
func (f *interceptedForward) Resume() error {
4✔
664
        // Forward to the switch. A link quit channel isn't needed, because we
4✔
665
        // are on a different thread now.
4✔
666
        return f.htlcSwitch.ForwardPackets(nil, f.packet)
4✔
667
}
4✔
668

669
// ResumeModified resumes the default behavior with field modifications. The
670
// input amount (if provided) specifies that the value of the inbound HTLC
671
// should be interpreted differently from the on-chain amount during further
672
// validation. The presence of an output amount and/or custom records indicates
673
// that those values should be modified on the outgoing HTLC.
674
func (f *interceptedForward) ResumeModified(
675
        inAmountMsat fn.Option[lnwire.MilliSatoshi],
676
        outAmountMsat fn.Option[lnwire.MilliSatoshi],
677
        outWireCustomRecords fn.Option[lnwire.CustomRecords]) error {
4✔
678

4✔
679
        // Convert the optional custom records to the correct type and validate
4✔
680
        // them.
4✔
681
        validatedRecords, err := fn.MapOptionZ(
4✔
682
                outWireCustomRecords,
4✔
683
                func(cr lnwire.CustomRecords) fn.Result[lnwire.CustomRecords] {
8✔
684
                        if len(cr) == 0 {
4✔
685
                                return fn.Ok[lnwire.CustomRecords](nil)
×
686
                        }
×
687

688
                        // Type cast and validate custom records.
689
                        err := cr.Validate()
4✔
690
                        if err != nil {
4✔
691
                                return fn.Err[lnwire.CustomRecords](
×
692
                                        fmt.Errorf("failed to validate "+
×
693
                                                "custom records: %w", err),
×
694
                                )
×
695
                        }
×
696

697
                        return fn.Ok(cr)
4✔
698
                },
699
        ).Unpack()
700
        if err != nil {
4✔
701
                return fmt.Errorf("failed to encode custom records: %w",
×
702
                        err)
×
703
        }
×
704

705
        // Set the incoming amount, if it is provided, on the packet.
706
        inAmountMsat.WhenSome(func(amount lnwire.MilliSatoshi) {
4✔
707
                f.packet.incomingAmount = amount
×
708
        })
×
709

710
        // Modify the wire message contained in the packet.
711
        switch htlc := f.packet.htlc.(type) {
4✔
712
        case *lnwire.UpdateAddHTLC:
4✔
713
                outAmountMsat.WhenSome(func(amount lnwire.MilliSatoshi) {
8✔
714
                        f.packet.amount = amount
4✔
715
                        htlc.Amount = amount
4✔
716
                })
4✔
717

718
                // Merge custom records with any validated records that were
719
                // added in the modify request, overwriting any existing values
720
                // with those supplied in the modifier API.
721
                htlc.CustomRecords = htlc.CustomRecords.MergedCopy(
4✔
722
                        validatedRecords,
4✔
723
                )
4✔
724

725
        case *lnwire.UpdateFulfillHTLC:
×
726
                if len(validatedRecords) > 0 {
×
727
                        htlc.CustomRecords = validatedRecords
×
728
                }
×
729
        }
730

731
        log.Tracef("Forwarding packet %v", lnutils.SpewLogClosure(f.packet))
4✔
732

4✔
733
        // Forward to the switch. A link quit channel isn't needed, because we
4✔
734
        // are on a different thread now.
4✔
735
        return f.htlcSwitch.ForwardPackets(nil, f.packet)
4✔
736
}
737

738
// Fail notifies the intention to Fail an existing hold forward with an
739
// encrypted failure reason.
740
func (f *interceptedForward) Fail(reason []byte) error {
×
741
        obfuscatedReason := f.packet.obfuscator.IntermediateEncrypt(reason)
×
742

×
743
        return f.resolve(&lnwire.UpdateFailHTLC{
×
744
                Reason: obfuscatedReason,
×
745
        })
×
746
}
×
747

748
// FailWithCode notifies the intention to fail an existing hold forward with the
749
// specified failure code.
750
func (f *interceptedForward) FailWithCode(code lnwire.FailCode) error {
4✔
751
        shaOnionBlob := func() [32]byte {
4✔
752
                return sha256.Sum256(f.htlc.OnionBlob[:])
×
753
        }
×
754

755
        // Create a local failure.
756
        var failureMsg lnwire.FailureMessage
4✔
757

4✔
758
        switch code {
4✔
759
        case lnwire.CodeInvalidOnionVersion:
×
760
                failureMsg = &lnwire.FailInvalidOnionVersion{
×
761
                        OnionSHA256: shaOnionBlob(),
×
762
                }
×
763

764
        case lnwire.CodeInvalidOnionHmac:
×
765
                failureMsg = &lnwire.FailInvalidOnionHmac{
×
766
                        OnionSHA256: shaOnionBlob(),
×
767
                }
×
768

769
        case lnwire.CodeInvalidOnionKey:
×
770
                failureMsg = &lnwire.FailInvalidOnionKey{
×
771
                        OnionSHA256: shaOnionBlob(),
×
772
                }
×
773

774
        case lnwire.CodeTemporaryChannelFailure:
4✔
775
                update := f.htlcSwitch.failAliasUpdate(
4✔
776
                        f.packet.incomingChanID, true,
4✔
777
                )
4✔
778
                if update == nil {
8✔
779
                        // Fallback to the original, non-alias behavior.
4✔
780
                        var err error
4✔
781
                        update, err = f.htlcSwitch.cfg.FetchLastChannelUpdate(
4✔
782
                                f.packet.incomingChanID,
4✔
783
                        )
4✔
784
                        if err != nil {
4✔
785
                                return err
×
786
                        }
×
787
                }
788

789
                failureMsg = lnwire.NewTemporaryChannelFailure(update)
4✔
790

791
        case lnwire.CodeExpiryTooSoon:
×
792
                update, err := f.htlcSwitch.cfg.FetchLastChannelUpdate(
×
793
                        f.packet.incomingChanID,
×
794
                )
×
795
                if err != nil {
×
796
                        return err
×
797
                }
×
798

799
                failureMsg = lnwire.NewExpiryTooSoon(*update)
×
800

801
        default:
×
802
                return ErrUnsupportedFailureCode
×
803
        }
804

805
        // Encrypt the failure for the first hop. This node will be the origin
806
        // of the failure.
807
        reason, err := f.packet.obfuscator.EncryptFirstHop(failureMsg)
4✔
808
        if err != nil {
4✔
809
                return fmt.Errorf("failed to encrypt failure reason %w", err)
×
810
        }
×
811

812
        return f.resolve(&lnwire.UpdateFailHTLC{
4✔
813
                Reason: reason,
4✔
814
        })
4✔
815
}
816

817
// Settle forwards a settled packet to the switch.
818
func (f *interceptedForward) Settle(preimage lntypes.Preimage) error {
4✔
819
        if !preimage.Matches(f.htlc.PaymentHash) {
4✔
820
                return errors.New("preimage does not match hash")
×
821
        }
×
822
        return f.resolve(&lnwire.UpdateFulfillHTLC{
4✔
823
                PaymentPreimage: preimage,
4✔
824
        })
4✔
825
}
826

827
// resolve is used for both Settle and Fail and forwards the message to the
828
// switch.
829
func (f *interceptedForward) resolve(message lnwire.Message) error {
4✔
830
        pkt := &htlcPacket{
4✔
831
                incomingChanID: f.packet.incomingChanID,
4✔
832
                incomingHTLCID: f.packet.incomingHTLCID,
4✔
833
                outgoingChanID: f.packet.outgoingChanID,
4✔
834
                outgoingHTLCID: f.packet.outgoingHTLCID,
4✔
835
                isResolution:   true,
4✔
836
                circuit:        f.packet.circuit,
4✔
837
                htlc:           message,
4✔
838
                obfuscator:     f.packet.obfuscator,
4✔
839
                sourceRef:      f.packet.sourceRef,
4✔
840
        }
4✔
841
        return f.htlcSwitch.mailOrchestrator.Deliver(pkt.incomingChanID, pkt)
4✔
842
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc