• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16027444140

02 Jul 2025 02:08PM UTC coverage: 57.777% (-0.03%) from 57.803%
16027444140

Pull #10018

github

web-flow
Merge 6574a2b47 into 1d2e5472b
Pull Request #10018: Refactor link's long methods

429 of 808 new or added lines in 1 file covered. (53.09%)

60 existing lines in 9 files now uncovered.

98478 of 170444 relevant lines covered (57.78%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.71
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        crand "crypto/rand"
7
        "crypto/sha256"
8
        "errors"
9
        "fmt"
10
        prand "math/rand"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/btcsuite/btclog/v2"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/contractcourt"
20
        "github.com/lightningnetwork/lnd/fn/v2"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/invoices"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lntypes"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
31
        "github.com/lightningnetwork/lnd/lnwire"
32
        "github.com/lightningnetwork/lnd/queue"
33
        "github.com/lightningnetwork/lnd/record"
34
        "github.com/lightningnetwork/lnd/ticker"
35
        "github.com/lightningnetwork/lnd/tlv"
36
)
37

38
const (
39
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
40
        // the node accepts for forwarded payments. The value is relative to the
41
        // current block height. The reason to have a maximum is to prevent
42
        // funds getting locked up unreasonably long. Otherwise, an attacker
43
        // willing to lock its own funds too, could force the funds of this node
44
        // to be locked up for an indefinite (max int32) number of blocks.
45
        //
46
        // The value 2016 corresponds to on average two weeks worth of blocks
47
        // and is based on the maximum number of hops (20), the default CLTV
48
        // delta (40), and some extra margin to account for the other lightning
49
        // implementations and past lnd versions which used to have a default
50
        // CLTV delta of 144.
51
        DefaultMaxOutgoingCltvExpiry = 2016
52

53
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
54
        // which a link should propose to update its commitment fee rate.
55
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
56

57
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
58
        // which a link should propose to update its commitment fee rate.
59
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
60

61
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
62
        // a channel's commitment fee to be of its balance. This only applies to
63
        // the initiator of the channel.
64
        DefaultMaxLinkFeeAllocation float64 = 0.5
65
)
66

67
// ExpectedFee computes the expected fee for a given htlc amount. The value
68
// returned from this function is to be used as a sanity check when forwarding
69
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
70
// forwarding policy.
71
//
72
// TODO(roasbeef): also add in current available channel bandwidth, inverse
73
// func
74
func ExpectedFee(f models.ForwardingPolicy,
75
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
3✔
76

3✔
77
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
3✔
78
}
3✔
79

80
// ChannelLinkConfig defines the configuration for the channel link. ALL
81
// elements within the configuration MUST be non-nil for channel link to carry
82
// out its duties.
83
type ChannelLinkConfig struct {
84
        // FwrdingPolicy is the initial forwarding policy to be used when
85
        // deciding whether to forwarding incoming HTLC's or not. This value
86
        // can be updated with subsequent calls to UpdateForwardingPolicy
87
        // targeted at a given ChannelLink concrete interface implementation.
88
        FwrdingPolicy models.ForwardingPolicy
89

90
        // Circuits provides restricted access to the switch's circuit map,
91
        // allowing the link to open and close circuits.
92
        Circuits CircuitModifier
93

94
        // BestHeight returns the best known height.
95
        BestHeight func() uint32
96

97
        // ForwardPackets attempts to forward the batch of htlcs through the
98
        // switch. The function returns and error in case it fails to send one or
99
        // more packets. The link's quit signal should be provided to allow
100
        // cancellation of forwarding during link shutdown.
101
        ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
102

103
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
104
        // blobs, which are then used to inform how to forward an HTLC.
105
        //
106
        // NOTE: This function assumes the same set of readers and preimages
107
        // are always presented for the same identifier. The last boolean is
108
        // used to decide whether this is a reforwarding or not - when it's
109
        // reforwarding, we skip the replay check enforced in our decay log.
110
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) (
111
                []hop.DecodeHopIteratorResponse, error)
112

113
        // ExtractErrorEncrypter function is responsible for decoding HTLC
114
        // Sphinx onion blob, and creating onion failure obfuscator.
115
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
116

117
        // FetchLastChannelUpdate retrieves the latest routing policy for a
118
        // target channel. This channel will typically be the outgoing channel
119
        // specified when we receive an incoming HTLC.  This will be used to
120
        // provide payment senders our latest policy when sending encrypted
121
        // error messages.
122
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
123
                *lnwire.ChannelUpdate1, error)
124

125
        // Peer is a lightning network node with which we have the channel link
126
        // opened.
127
        Peer lnpeer.Peer
128

129
        // Registry is a sub-system which responsible for managing the invoices
130
        // in thread-safe manner.
131
        Registry InvoiceDatabase
132

133
        // PreimageCache is a global witness beacon that houses any new
134
        // preimages discovered by other links. We'll use this to add new
135
        // witnesses that we discover which will notify any sub-systems
136
        // subscribed to new events.
137
        PreimageCache contractcourt.WitnessBeacon
138

139
        // OnChannelFailure is a function closure that we'll call if the
140
        // channel failed for some reason. Depending on the severity of the
141
        // error, the closure potentially must force close this channel and
142
        // disconnect the peer.
143
        //
144
        // NOTE: The method must return in order for the ChannelLink to be able
145
        // to shut down properly.
146
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
147
                LinkFailureError)
148

149
        // UpdateContractSignals is a function closure that we'll use to update
150
        // outside sub-systems with this channel's latest ShortChannelID.
151
        UpdateContractSignals func(*contractcourt.ContractSignals) error
152

153
        // NotifyContractUpdate is a function closure that we'll use to update
154
        // the contractcourt and more specifically the ChannelArbitrator of the
155
        // latest channel state.
156
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
157

158
        // ChainEvents is an active subscription to the chain watcher for this
159
        // channel to be notified of any on-chain activity related to this
160
        // channel.
161
        ChainEvents *contractcourt.ChainEventSubscription
162

163
        // FeeEstimator is an instance of a live fee estimator which will be
164
        // used to dynamically regulate the current fee of the commitment
165
        // transaction to ensure timely confirmation.
166
        FeeEstimator chainfee.Estimator
167

168
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
169
        // for HTLC forwarding internal to the switch.
170
        //
171
        // NOTE: This should only be used for testing.
172
        HodlMask hodl.Mask
173

174
        // SyncStates is used to indicate that we need send the channel
175
        // reestablishment message to the remote peer. It should be done if our
176
        // clients have been restarted, or remote peer have been reconnected.
177
        SyncStates bool
178

179
        // BatchTicker is the ticker that determines the interval that we'll
180
        // use to check the batch to see if there're any updates we should
181
        // flush out. By batching updates into a single commit, we attempt to
182
        // increase throughput by maximizing the number of updates coalesced
183
        // into a single commit.
184
        BatchTicker ticker.Ticker
185

186
        // FwdPkgGCTicker is the ticker determining the frequency at which
187
        // garbage collection of forwarding packages occurs. We use a
188
        // time-based approach, as opposed to block epochs, as to not hinder
189
        // syncing.
190
        FwdPkgGCTicker ticker.Ticker
191

192
        // PendingCommitTicker is a ticker that allows the link to determine if
193
        // a locally initiated commitment dance gets stuck waiting for the
194
        // remote party to revoke.
195
        PendingCommitTicker ticker.Ticker
196

197
        // BatchSize is the max size of a batch of updates done to the link
198
        // before we do a state update.
199
        BatchSize uint32
200

201
        // UnsafeReplay will cause a link to replay the adds in its latest
202
        // commitment txn after the link is restarted. This should only be used
203
        // in testing, it is here to ensure the sphinx replay detection on the
204
        // receiving node is persistent.
205
        UnsafeReplay bool
206

207
        // MinUpdateTimeout represents the minimum interval in which a link
208
        // will propose to update its commitment fee rate. A random timeout will
209
        // be selected between this and MaxUpdateTimeout.
210
        MinUpdateTimeout time.Duration
211

212
        // MaxUpdateTimeout represents the maximum interval in which a link
213
        // will propose to update its commitment fee rate. A random timeout will
214
        // be selected between this and MinUpdateTimeout.
215
        MaxUpdateTimeout time.Duration
216

217
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
218
        // an htlc where we don't offer an htlc anymore. This should be at least
219
        // the outgoing broadcast delta, because in any case we don't want to
220
        // risk offering an htlc that triggers channel closure.
221
        OutgoingCltvRejectDelta uint32
222

223
        // TowerClient is an optional engine that manages the signing,
224
        // encrypting, and uploading of justice transactions to the daemon's
225
        // configured set of watchtowers for legacy channels.
226
        TowerClient TowerClient
227

228
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
229
        // should accept for a forwarded HTLC. The value is relative to the
230
        // current block height.
231
        MaxOutgoingCltvExpiry uint32
232

233
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
234
        // commitment fee to be of its balance. This only applies to the
235
        // initiator of the channel.
236
        MaxFeeAllocation float64
237

238
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
239
        // the initiator for channels of the anchor type.
240
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
241

242
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
243
        // link is first started.
244
        NotifyActiveLink func(wire.OutPoint)
245

246
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
247
        // channels becomes active.
248
        NotifyActiveChannel func(wire.OutPoint)
249

250
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
251
        // when channels become inactive.
252
        NotifyInactiveChannel func(wire.OutPoint)
253

254
        // NotifyInactiveLinkEvent allows the switch to tell the
255
        // ChannelNotifier when a channel link become inactive.
256
        NotifyInactiveLinkEvent func(wire.OutPoint)
257

258
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
259
        // events through.
260
        HtlcNotifier htlcNotifier
261

262
        // FailAliasUpdate is a function used to fail an HTLC for an
263
        // option_scid_alias channel.
264
        FailAliasUpdate func(sid lnwire.ShortChannelID,
265
                incoming bool) *lnwire.ChannelUpdate1
266

267
        // GetAliases is used by the link and switch to fetch the set of
268
        // aliases for a given link.
269
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
270

271
        // PreviouslySentShutdown is an optional value that is set if, at the
272
        // time of the link being started, persisted shutdown info was found for
273
        // the channel. This value being set means that we previously sent a
274
        // Shutdown message to our peer, and so we should do so again on
275
        // re-establish and should not allow anymore HTLC adds on the outgoing
276
        // direction of the link.
277
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
278

279
        // Adds the option to disable forwarding payments in blinded routes
280
        // by failing back any blinding-related payloads as if they were
281
        // invalid.
282
        DisallowRouteBlinding bool
283

284
        // DisallowQuiescence is a flag that can be used to disable the
285
        // quiescence protocol.
286
        DisallowQuiescence bool
287

288
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
289
        // restrict the flow of HTLCs and fee updates.
290
        MaxFeeExposure lnwire.MilliSatoshi
291

292
        // ShouldFwdExpEndorsement is a closure that indicates whether the link
293
        // should forward experimental endorsement signals.
294
        ShouldFwdExpEndorsement func() bool
295

296
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
297
        // used to manage the bandwidth of the link.
298
        AuxTrafficShaper fn.Option[AuxTrafficShaper]
299
}
300

301
// channelLink is the service which drives a channel's commitment update
302
// state-machine. In the event that an HTLC needs to be propagated to another
303
// link, the forward handler from config is used which sends HTLC to the
304
// switch. Additionally, the link encapsulate logic of commitment protocol
305
// message ordering and updates.
306
type channelLink struct {
307
        // The following fields are only meant to be used *atomically*
308
        started       int32
309
        reestablished int32
310
        shutdown      int32
311

312
        // failed should be set to true in case a link error happens, making
313
        // sure we don't process any more updates.
314
        failed bool
315

316
        // keystoneBatch represents a volatile list of keystones that must be
317
        // written before attempting to sign the next commitment txn. These
318
        // represent all the HTLC's forwarded to the link from the switch. Once
319
        // we lock them into our outgoing commitment, then the circuit has a
320
        // keystone, and is fully opened.
321
        keystoneBatch []Keystone
322

323
        // openedCircuits is the set of all payment circuits that will be open
324
        // once we make our next commitment. After making the commitment we'll
325
        // ACK all these from our mailbox to ensure that they don't get
326
        // re-delivered if we reconnect.
327
        openedCircuits []CircuitKey
328

329
        // closedCircuits is the set of all payment circuits that will be
330
        // closed once we make our next commitment. After taking the commitment
331
        // we'll ACK all these to ensure that they don't get re-delivered if we
332
        // reconnect.
333
        closedCircuits []CircuitKey
334

335
        // channel is a lightning network channel to which we apply htlc
336
        // updates.
337
        channel *lnwallet.LightningChannel
338

339
        // cfg is a structure which carries all dependable fields/handlers
340
        // which may affect behaviour of the service.
341
        cfg ChannelLinkConfig
342

343
        // mailBox is the main interface between the outside world and the
344
        // link. All incoming messages will be sent over this mailBox. Messages
345
        // include new updates from our connected peer, and new packets to be
346
        // forwarded sent by the switch.
347
        mailBox MailBox
348

349
        // upstream is a channel that new messages sent from the remote peer to
350
        // the local peer will be sent across.
351
        upstream chan lnwire.Message
352

353
        // downstream is a channel in which new multi-hop HTLC's to be
354
        // forwarded will be sent across. Messages from this channel are sent
355
        // by the HTLC switch.
356
        downstream chan *htlcPacket
357

358
        // updateFeeTimer is the timer responsible for updating the link's
359
        // commitment fee every time it fires.
360
        updateFeeTimer *time.Timer
361

362
        // uncommittedPreimages stores a list of all preimages that have been
363
        // learned since receiving the last CommitSig from the remote peer. The
364
        // batch will be flushed just before accepting the subsequent CommitSig
365
        // or on shutdown to avoid doing a write for each preimage received.
366
        uncommittedPreimages []lntypes.Preimage
367

368
        sync.RWMutex
369

370
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
371
        // registry.
372
        hodlQueue *queue.ConcurrentQueue
373

374
        // hodlMap stores related htlc data for a circuit key. It allows
375
        // resolving those htlcs when we receive a message on hodlQueue.
376
        hodlMap map[models.CircuitKey]hodlHtlc
377

378
        // log is a link-specific logging instance.
379
        log btclog.Logger
380

381
        // isOutgoingAddBlocked tracks whether the channelLink can send an
382
        // UpdateAddHTLC.
383
        isOutgoingAddBlocked atomic.Bool
384

385
        // isIncomingAddBlocked tracks whether the channelLink can receive an
386
        // UpdateAddHTLC.
387
        isIncomingAddBlocked atomic.Bool
388

389
        // flushHooks is a hookMap that is triggered when we reach a channel
390
        // state with no live HTLCs.
391
        flushHooks hookMap
392

393
        // outgoingCommitHooks is a hookMap that is triggered after we send our
394
        // next CommitSig.
395
        outgoingCommitHooks hookMap
396

397
        // incomingCommitHooks is a hookMap that is triggered after we receive
398
        // our next CommitSig.
399
        incomingCommitHooks hookMap
400

401
        // quiescer is the state machine that tracks where this channel is with
402
        // respect to the quiescence protocol.
403
        quiescer Quiescer
404

405
        // quiescenceReqs is a queue of requests to quiesce this link. The
406
        // members of the queue are send-only channels we should call back with
407
        // the result.
408
        quiescenceReqs chan StfuReq
409

410
        // cg is a helper that encapsulates a wait group and quit channel and
411
        // allows contexts that either block or cancel on those depending on
412
        // the use case.
413
        cg *fn.ContextGuard
414
}
415

416
// hookMap is a data structure that is used to track the hooks that need to be
417
// called in various parts of the channelLink's lifecycle.
418
//
419
// WARNING: NOT thread-safe.
420
type hookMap struct {
421
        // allocIdx keeps track of the next id we haven't yet allocated.
422
        allocIdx atomic.Uint64
423

424
        // transient is a map of hooks that are only called the next time invoke
425
        // is called. These hooks are deleted during invoke.
426
        transient map[uint64]func()
427

428
        // newTransients is a channel that we use to accept new hooks into the
429
        // hookMap.
430
        newTransients chan func()
431
}
432

433
// newHookMap initializes a new empty hookMap.
434
func newHookMap() hookMap {
3✔
435
        return hookMap{
3✔
436
                allocIdx:      atomic.Uint64{},
3✔
437
                transient:     make(map[uint64]func()),
3✔
438
                newTransients: make(chan func()),
3✔
439
        }
3✔
440
}
3✔
441

442
// alloc allocates space in the hook map for the supplied hook, the second
443
// argument determines whether it goes into the transient or persistent part
444
// of the hookMap.
445
func (m *hookMap) alloc(hook func()) uint64 {
3✔
446
        // We assume we never overflow a uint64. Seems OK.
3✔
447
        hookID := m.allocIdx.Add(1)
3✔
448
        if hookID == 0 {
3✔
449
                panic("hookMap allocIdx overflow")
×
450
        }
451
        m.transient[hookID] = hook
3✔
452

3✔
453
        return hookID
3✔
454
}
455

456
// invoke is used on a hook map to call all the registered hooks and then clear
457
// out the transient hooks so they are not called again.
458
func (m *hookMap) invoke() {
3✔
459
        for _, hook := range m.transient {
6✔
460
                hook()
3✔
461
        }
3✔
462

463
        m.transient = make(map[uint64]func())
3✔
464
}
465

466
// hodlHtlc contains htlc data that is required for resolution.
467
type hodlHtlc struct {
468
        add        lnwire.UpdateAddHTLC
469
        sourceRef  channeldb.AddRef
470
        obfuscator hop.ErrorEncrypter
471
}
472

473
// NewChannelLink creates a new instance of a ChannelLink given a configuration
474
// and active channel that will be used to verify/apply updates to.
475
func NewChannelLink(cfg ChannelLinkConfig,
476
        channel *lnwallet.LightningChannel) ChannelLink {
3✔
477

3✔
478
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
3✔
479

3✔
480
        // If the max fee exposure isn't set, use the default.
3✔
481
        if cfg.MaxFeeExposure == 0 {
3✔
482
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
×
483
        }
×
484

485
        var qsm Quiescer
3✔
486
        if !cfg.DisallowQuiescence {
6✔
487
                qsm = NewQuiescer(QuiescerCfg{
3✔
488
                        chanID: lnwire.NewChanIDFromOutPoint(
3✔
489
                                channel.ChannelPoint(),
3✔
490
                        ),
3✔
491
                        channelInitiator: channel.Initiator(),
3✔
492
                        sendMsg: func(s lnwire.Stfu) error {
6✔
493
                                return cfg.Peer.SendMessage(false, &s)
3✔
494
                        },
3✔
495
                        timeoutDuration: defaultQuiescenceTimeout,
496
                        onTimeout: func() {
×
497
                                cfg.Peer.Disconnect(ErrQuiescenceTimeout)
×
498
                        },
×
499
                })
500
        } else {
×
501
                qsm = &quiescerNoop{}
×
502
        }
×
503

504
        quiescenceReqs := make(
3✔
505
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
3✔
506
        )
3✔
507

3✔
508
        return &channelLink{
3✔
509
                cfg:                 cfg,
3✔
510
                channel:             channel,
3✔
511
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
3✔
512
                hodlQueue:           queue.NewConcurrentQueue(10),
3✔
513
                log:                 log.WithPrefix(logPrefix),
3✔
514
                flushHooks:          newHookMap(),
3✔
515
                outgoingCommitHooks: newHookMap(),
3✔
516
                incomingCommitHooks: newHookMap(),
3✔
517
                quiescer:            qsm,
3✔
518
                quiescenceReqs:      quiescenceReqs,
3✔
519
                cg:                  fn.NewContextGuard(),
3✔
520
        }
3✔
521
}
522

523
// A compile time check to ensure channelLink implements the ChannelLink
524
// interface.
525
var _ ChannelLink = (*channelLink)(nil)
526

527
// Start starts all helper goroutines required for the operation of the channel
528
// link.
529
//
530
// NOTE: Part of the ChannelLink interface.
531
func (l *channelLink) Start() error {
3✔
532
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
3✔
533
                err := fmt.Errorf("channel link(%v): already started", l)
×
534
                l.log.Warn("already started")
×
535
                return err
×
536
        }
×
537

538
        l.log.Info("starting")
3✔
539

3✔
540
        // If the config supplied watchtower client, ensure the channel is
3✔
541
        // registered before trying to use it during operation.
3✔
542
        if l.cfg.TowerClient != nil {
6✔
543
                err := l.cfg.TowerClient.RegisterChannel(
3✔
544
                        l.ChanID(), l.channel.State().ChanType,
3✔
545
                )
3✔
546
                if err != nil {
3✔
547
                        return err
×
548
                }
×
549
        }
550

551
        l.mailBox.ResetMessages()
3✔
552
        l.hodlQueue.Start()
3✔
553

3✔
554
        // Before launching the htlcManager messages, revert any circuits that
3✔
555
        // were marked open in the switch's circuit map, but did not make it
3✔
556
        // into a commitment txn. We use the next local htlc index as the cut
3✔
557
        // off point, since all indexes below that are committed. This action
3✔
558
        // is only performed if the link's final short channel ID has been
3✔
559
        // assigned, otherwise we would try to trim the htlcs belonging to the
3✔
560
        // all-zero, hop.Source ID.
3✔
561
        if l.ShortChanID() != hop.Source {
6✔
562
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
3✔
563
                if err != nil {
3✔
564
                        return fmt.Errorf("unable to retrieve next local "+
×
565
                                "htlc index: %v", err)
×
566
                }
×
567

568
                // NOTE: This is automatically done by the switch when it
569
                // starts up, but is necessary to prevent inconsistencies in
570
                // the case that the link flaps. This is a result of a link's
571
                // life-cycle being shorter than that of the switch.
572
                chanID := l.ShortChanID()
3✔
573
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
3✔
574
                if err != nil {
3✔
575
                        return fmt.Errorf("unable to trim circuits above "+
×
576
                                "local htlc index %d: %v", localHtlcIndex, err)
×
577
                }
×
578

579
                // Since the link is live, before we start the link we'll update
580
                // the ChainArbitrator with the set of new channel signals for
581
                // this channel.
582
                //
583
                // TODO(roasbeef): split goroutines within channel arb to avoid
584
                go func() {
6✔
585
                        signals := &contractcourt.ContractSignals{
3✔
586
                                ShortChanID: l.channel.ShortChanID(),
3✔
587
                        }
3✔
588

3✔
589
                        err := l.cfg.UpdateContractSignals(signals)
3✔
590
                        if err != nil {
3✔
591
                                l.log.Errorf("unable to update signals")
×
592
                        }
×
593
                }()
594
        }
595

596
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
3✔
597

3✔
598
        l.cg.WgAdd(1)
3✔
599
        go l.htlcManager(context.TODO())
3✔
600

3✔
601
        return nil
3✔
602
}
603

604
// Stop gracefully stops all active helper goroutines, then waits until they've
605
// exited.
606
//
607
// NOTE: Part of the ChannelLink interface.
608
func (l *channelLink) Stop() {
3✔
609
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
3✔
610
                l.log.Warn("already stopped")
×
611
                return
×
612
        }
×
613

614
        l.log.Info("stopping")
3✔
615

3✔
616
        // As the link is stopping, we are no longer interested in htlc
3✔
617
        // resolutions coming from the invoice registry.
3✔
618
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
3✔
619

3✔
620
        if l.cfg.ChainEvents.Cancel != nil {
6✔
621
                l.cfg.ChainEvents.Cancel()
3✔
622
        }
3✔
623

624
        // Ensure the channel for the timer is drained.
625
        if l.updateFeeTimer != nil {
6✔
626
                if !l.updateFeeTimer.Stop() {
3✔
627
                        select {
×
628
                        case <-l.updateFeeTimer.C:
×
629
                        default:
×
630
                        }
631
                }
632
        }
633

634
        if l.hodlQueue != nil {
6✔
635
                l.hodlQueue.Stop()
3✔
636
        }
3✔
637

638
        l.cg.Quit()
3✔
639
        l.cg.WgWait()
3✔
640

3✔
641
        // Now that the htlcManager has completely exited, reset the packet
3✔
642
        // courier. This allows the mailbox to revaluate any lingering Adds that
3✔
643
        // were delivered but didn't make it on a commitment to be failed back
3✔
644
        // if the link is offline for an extended period of time. The error is
3✔
645
        // ignored since it can only fail when the daemon is exiting.
3✔
646
        _ = l.mailBox.ResetPackets()
3✔
647

3✔
648
        // As a final precaution, we will attempt to flush any uncommitted
3✔
649
        // preimages to the preimage cache. The preimages should be re-delivered
3✔
650
        // after channel reestablishment, however this adds an extra layer of
3✔
651
        // protection in case the peer never returns. Without this, we will be
3✔
652
        // unable to settle any contracts depending on the preimages even though
3✔
653
        // we had learned them at some point.
3✔
654
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
3✔
655
        if err != nil {
3✔
656
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
657
                        l.uncommittedPreimages, err)
×
658
        }
×
659
}
660

661
// WaitForShutdown blocks until the link finishes shutting down, which includes
662
// termination of all dependent goroutines.
663
func (l *channelLink) WaitForShutdown() {
×
664
        l.cg.WgWait()
×
665
}
×
666

667
// EligibleToForward returns a bool indicating if the channel is able to
668
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
669
// we are eligible to update AND the channel isn't currently flushing the
670
// outgoing half of the channel.
671
//
672
// NOTE: MUST NOT be called from the main event loop.
673
func (l *channelLink) EligibleToForward() bool {
3✔
674
        l.RLock()
3✔
675
        defer l.RUnlock()
3✔
676

3✔
677
        return l.eligibleToForward()
3✔
678
}
3✔
679

680
// eligibleToForward returns a bool indicating if the channel is able to
681
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
682
// we are eligible to update AND the channel isn't currently flushing the
683
// outgoing half of the channel.
684
//
685
// NOTE: MUST be called from the main event loop.
686
func (l *channelLink) eligibleToForward() bool {
3✔
687
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
3✔
688
}
3✔
689

690
// eligibleToUpdate returns a bool indicating if the channel is able to update
691
// channel state. We're able to update channel state if we know the remote
692
// party's next revocation point. Otherwise, we can't initiate new channel
693
// state. We also require that the short channel ID not be the all-zero source
694
// ID, meaning that the channel has had its ID finalized.
695
//
696
// NOTE: MUST be called from the main event loop.
697
func (l *channelLink) eligibleToUpdate() bool {
3✔
698
        return l.channel.RemoteNextRevocation() != nil &&
3✔
699
                l.channel.ShortChanID() != hop.Source &&
3✔
700
                l.isReestablished() &&
3✔
701
                l.quiescer.CanSendUpdates()
3✔
702
}
3✔
703

704
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
705
// the specified direction. It returns true if the state was changed and false
706
// if the desired state was already set before the method was called.
707
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
×
708
        if linkDirection == Outgoing {
×
709
                return l.isOutgoingAddBlocked.Swap(false)
×
710
        }
×
711

712
        return l.isIncomingAddBlocked.Swap(false)
×
713
}
714

715
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
716
// the specified direction. It returns true if the state was changed and false
717
// if the desired state was already set before the method was called.
718
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
3✔
719
        if linkDirection == Outgoing {
6✔
720
                return !l.isOutgoingAddBlocked.Swap(true)
3✔
721
        }
3✔
722

723
        return !l.isIncomingAddBlocked.Swap(true)
3✔
724
}
725

726
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
727
// the argument.
728
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
3✔
729
        if linkDirection == Outgoing {
6✔
730
                return l.isOutgoingAddBlocked.Load()
3✔
731
        }
3✔
732

733
        return l.isIncomingAddBlocked.Load()
3✔
734
}
735

736
// OnFlushedOnce adds a hook that will be called the next time the channel
737
// state reaches zero htlcs. This hook will only ever be called once. If the
738
// channel state already has zero htlcs, then this will be called immediately.
739
func (l *channelLink) OnFlushedOnce(hook func()) {
3✔
740
        select {
3✔
741
        case l.flushHooks.newTransients <- hook:
3✔
742
        case <-l.cg.Done():
×
743
        }
744
}
745

746
// OnCommitOnce adds a hook that will be called the next time a CommitSig
747
// message is sent in the argument's LinkDirection. This hook will only ever be
748
// called once. If no CommitSig is owed in the argument's LinkDirection, then
749
// we will call this hook be run immediately.
750
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
3✔
751
        var queue chan func()
3✔
752

3✔
753
        if direction == Outgoing {
6✔
754
                queue = l.outgoingCommitHooks.newTransients
3✔
755
        } else {
3✔
756
                queue = l.incomingCommitHooks.newTransients
×
757
        }
×
758

759
        select {
3✔
760
        case queue <- hook:
3✔
761
        case <-l.cg.Done():
×
762
        }
763
}
764

765
// InitStfu allows us to initiate quiescence on this link. It returns a receive
766
// only channel that will block until quiescence has been achieved, or
767
// definitively fails.
768
//
769
// This operation has been added to allow channels to be quiesced via RPC. It
770
// may be removed or reworked in the future as RPC initiated quiescence is a
771
// holdover until we have downstream protocols that use it.
772
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
3✔
773
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
3✔
774
                fn.Unit{},
3✔
775
        )
3✔
776

3✔
777
        select {
3✔
778
        case l.quiescenceReqs <- req:
3✔
779
        case <-l.cg.Done():
×
780
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
781
        }
782

783
        return out
3✔
784
}
785

786
// isReestablished returns true if the link has successfully completed the
787
// channel reestablishment dance.
788
func (l *channelLink) isReestablished() bool {
3✔
789
        return atomic.LoadInt32(&l.reestablished) == 1
3✔
790
}
3✔
791

792
// markReestablished signals that the remote peer has successfully exchanged
793
// channel reestablish messages and that the channel is ready to process
794
// subsequent messages.
795
func (l *channelLink) markReestablished() {
3✔
796
        atomic.StoreInt32(&l.reestablished, 1)
3✔
797
}
3✔
798

799
// IsUnadvertised returns true if the underlying channel is unadvertised.
800
func (l *channelLink) IsUnadvertised() bool {
3✔
801
        state := l.channel.State()
3✔
802
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
3✔
803
}
3✔
804

805
// sampleNetworkFee samples the current fee rate on the network to get into the
806
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
807
// this is the native rate used when computing the fee for commitment
808
// transactions, and the second-level HTLC transactions.
809
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
×
810
        // We'll first query for the sat/kw recommended to be confirmed within 3
×
811
        // blocks.
×
812
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
×
813
        if err != nil {
×
814
                return 0, err
×
815
        }
×
816

817
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
×
818
                int64(feePerKw))
×
819

×
820
        return feePerKw, nil
×
821
}
822

823
// shouldAdjustCommitFee returns true if we should update our commitment fee to
824
// match that of the network fee. We'll only update our commitment fee if the
825
// network fee is +/- 10% to our commitment fee or if our current commitment
826
// fee is below the minimum relay fee.
827
func shouldAdjustCommitFee(netFee, chanFee,
828
        minRelayFee chainfee.SatPerKWeight) bool {
×
829

×
830
        switch {
×
831
        // If the network fee is greater than our current commitment fee and
832
        // our current commitment fee is below the minimum relay fee then
833
        // we should switch to it no matter if it is less than a 10% increase.
834
        case netFee > chanFee && chanFee < minRelayFee:
×
835
                return true
×
836

837
        // If the network fee is greater than the commitment fee, then we'll
838
        // switch to it if it's at least 10% greater than the commit fee.
839
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
×
840
                return true
×
841

842
        // If the network fee is less than our commitment fee, then we'll
843
        // switch to it if it's at least 10% less than the commitment fee.
844
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
×
845
                return true
×
846

847
        // Otherwise, we won't modify our fee.
848
        default:
×
849
                return false
×
850
        }
851
}
852

853
// failCb is used to cut down on the argument verbosity.
854
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
855

856
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
857
// outgoing HTLC. It may return a FailureMessage that references a channel's
858
// alias. If the channel does not have an alias, then the regular channel
859
// update from disk will be returned.
860
func (l *channelLink) createFailureWithUpdate(incoming bool,
861
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
3✔
862

3✔
863
        // Determine which SCID to use in case we need to use aliases in the
3✔
864
        // ChannelUpdate.
3✔
865
        scid := outgoingScid
3✔
866
        if incoming {
3✔
867
                scid = l.ShortChanID()
×
868
        }
×
869

870
        // Try using the FailAliasUpdate function. If it returns nil, fallback
871
        // to the non-alias behavior.
872
        update := l.cfg.FailAliasUpdate(scid, incoming)
3✔
873
        if update == nil {
6✔
874
                // Fallback to the non-alias behavior.
3✔
875
                var err error
3✔
876
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
3✔
877
                if err != nil {
3✔
878
                        return &lnwire.FailTemporaryNodeFailure{}
×
879
                }
×
880
        }
881

882
        return cb(update)
3✔
883
}
884

885
// syncChanState attempts to synchronize channel states with the remote party.
886
// This method is to be called upon reconnection after the initial funding
887
// flow. We'll compare out commitment chains with the remote party, and re-send
888
// either a danging commit signature, a revocation, or both.
889
func (l *channelLink) syncChanStates(ctx context.Context) error {
3✔
890
        chanState := l.channel.State()
3✔
891

3✔
892
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
3✔
893

3✔
894
        // First, we'll generate our ChanSync message to send to the other
3✔
895
        // side. Based on this message, the remote party will decide if they
3✔
896
        // need to retransmit any data or not.
3✔
897
        localChanSyncMsg, err := chanState.ChanSyncMsg()
3✔
898
        if err != nil {
3✔
899
                return fmt.Errorf("unable to generate chan sync message for "+
×
900
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
901
        }
×
902
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
3✔
903
                return fmt.Errorf("unable to send chan sync message for "+
×
904
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
905
        }
×
906

907
        var msgsToReSend []lnwire.Message
3✔
908

3✔
909
        // Next, we'll wait indefinitely to receive the ChanSync message. The
3✔
910
        // first message sent MUST be the ChanSync message.
3✔
911
        select {
3✔
912
        case msg := <-l.upstream:
3✔
913
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
3✔
914
                        l.cfg.Peer.PubKey())
3✔
915

3✔
916
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
3✔
917
                if !ok {
3✔
918
                        return fmt.Errorf("first message sent to sync "+
×
919
                                "should be ChannelReestablish, instead "+
×
920
                                "received: %T", msg)
×
921
                }
×
922

923
                // If the remote party indicates that they think we haven't
924
                // done any state updates yet, then we'll retransmit the
925
                // channel_ready message first. We do this, as at this point
926
                // we can't be sure if they've really received the
927
                // ChannelReady message.
928
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
3✔
929
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
3✔
930
                        !l.channel.IsPending() {
6✔
931

3✔
932
                        l.log.Infof("resending ChannelReady message to peer")
3✔
933

3✔
934
                        nextRevocation, err := l.channel.NextRevocationKey()
3✔
935
                        if err != nil {
3✔
936
                                return fmt.Errorf("unable to create next "+
×
937
                                        "revocation: %v", err)
×
938
                        }
×
939

940
                        channelReadyMsg := lnwire.NewChannelReady(
3✔
941
                                l.ChanID(), nextRevocation,
3✔
942
                        )
3✔
943

3✔
944
                        // If this is a taproot channel, then we'll send the
3✔
945
                        // very same nonce that we sent above, as they should
3✔
946
                        // take the latest verification nonce we send.
3✔
947
                        if chanState.ChanType.IsTaproot() {
6✔
948
                                //nolint:ll
3✔
949
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
3✔
950
                        }
3✔
951

952
                        // For channels that negotiated the option-scid-alias
953
                        // feature bit, ensure that we send over the alias in
954
                        // the channel_ready message. We'll send the first
955
                        // alias we find for the channel since it does not
956
                        // matter which alias we send. We'll error out if no
957
                        // aliases are found.
958
                        if l.negotiatedAliasFeature() {
6✔
959
                                aliases := l.getAliases()
3✔
960
                                if len(aliases) == 0 {
3✔
961
                                        // This shouldn't happen since we
×
962
                                        // always add at least one alias before
×
963
                                        // the channel reaches the link.
×
964
                                        return fmt.Errorf("no aliases found")
×
965
                                }
×
966

967
                                // getAliases returns a copy of the alias slice
968
                                // so it is ok to use a pointer to the first
969
                                // entry.
970
                                channelReadyMsg.AliasScid = &aliases[0]
3✔
971
                        }
972

973
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
3✔
974
                        if err != nil {
3✔
975
                                return fmt.Errorf("unable to re-send "+
×
976
                                        "ChannelReady: %v", err)
×
977
                        }
×
978
                }
979

980
                // In any case, we'll then process their ChanSync message.
981
                l.log.Info("received re-establishment message from remote side")
3✔
982

3✔
983
                var (
3✔
984
                        openedCircuits []CircuitKey
3✔
985
                        closedCircuits []CircuitKey
3✔
986
                )
3✔
987

3✔
988
                // We've just received a ChanSync message from the remote
3✔
989
                // party, so we'll process the message  in order to determine
3✔
990
                // if we need to re-transmit any messages to the remote party.
3✔
991
                ctx, cancel := l.cg.Create(ctx)
3✔
992
                defer cancel()
3✔
993
                msgsToReSend, openedCircuits, closedCircuits, err =
3✔
994
                        l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
3✔
995
                if err != nil {
6✔
996
                        return err
3✔
997
                }
3✔
998

999
                // Repopulate any identifiers for circuits that may have been
1000
                // opened or unclosed. This may happen if we needed to
1001
                // retransmit a commitment signature message.
1002
                l.openedCircuits = openedCircuits
3✔
1003
                l.closedCircuits = closedCircuits
3✔
1004

3✔
1005
                // Ensure that all packets have been have been removed from the
3✔
1006
                // link's mailbox.
3✔
1007
                if err := l.ackDownStreamPackets(); err != nil {
3✔
1008
                        return err
×
1009
                }
×
1010

1011
                if len(msgsToReSend) > 0 {
3✔
1012
                        l.log.Infof("sending %v updates to synchronize the "+
×
1013
                                "state", len(msgsToReSend))
×
1014
                }
×
1015

1016
                // If we have any messages to retransmit, we'll do so
1017
                // immediately so we return to a synchronized state as soon as
1018
                // possible.
1019
                for _, msg := range msgsToReSend {
3✔
NEW
1020
                        err := l.cfg.Peer.SendMessage(false, msg)
×
NEW
1021
                        if err != nil {
×
NEW
1022
                                l.log.Errorf("failed to send %v: %v",
×
NEW
1023
                                        msg.MsgType(), err)
×
NEW
1024
                        }
×
1025
                }
1026

1027
        case <-l.cg.Done():
3✔
1028
                return ErrLinkShuttingDown
3✔
1029
        }
1030

1031
        return nil
3✔
1032
}
1033

1034
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1035
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1036
// we previously received are reinstated in memory, and forwarded to the switch
1037
// if necessary. After a restart, this will also delete any previously
1038
// completed packages.
1039
func (l *channelLink) resolveFwdPkgs(ctx context.Context) error {
3✔
1040
        fwdPkgs, err := l.channel.LoadFwdPkgs()
3✔
1041
        if err != nil {
3✔
1042
                return err
×
1043
        }
×
1044

1045
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
3✔
1046

3✔
1047
        for _, fwdPkg := range fwdPkgs {
6✔
1048
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
3✔
1049
                        return err
×
1050
                }
×
1051
        }
1052

1053
        // If any of our reprocessing steps require an update to the commitment
1054
        // txn, we initiate a state transition to capture all relevant changes.
1055
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
6✔
1056
                return l.updateCommitTx(ctx)
3✔
1057
        }
3✔
1058

1059
        return nil
3✔
1060
}
1061

1062
// resolveFwdPkg interprets the FwdState of the provided package, either
1063
// reprocesses any outstanding htlcs in the package, or performs garbage
1064
// collection on the package.
1065
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
3✔
1066
        // Remove any completed packages to clear up space.
3✔
1067
        if fwdPkg.State == channeldb.FwdStateCompleted {
6✔
1068
                l.log.Debugf("removing completed fwd pkg for height=%d",
3✔
1069
                        fwdPkg.Height)
3✔
1070

3✔
1071
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
3✔
1072
                if err != nil {
3✔
1073
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
1074
                                "%v", fwdPkg.Height, err)
×
1075
                        return err
×
1076
                }
×
1077
        }
1078

1079
        // Otherwise this is either a new package or one has gone through
1080
        // processing, but contains htlcs that need to be restored in memory.
1081
        // We replay this forwarding package to make sure our local mem state
1082
        // is resurrected, we mimic any original responses back to the remote
1083
        // party, and re-forward the relevant HTLCs to the switch.
1084

1085
        // If the package is fully acked but not completed, it must still have
1086
        // settles and fails to propagate.
1087
        if !fwdPkg.SettleFailFilter.IsFull() {
6✔
1088
                l.processRemoteSettleFails(fwdPkg)
3✔
1089
        }
3✔
1090

1091
        // Finally, replay *ALL ADDS* in this forwarding package. The
1092
        // downstream logic is able to filter out any duplicates, but we must
1093
        // shove the entire, original set of adds down the pipeline so that the
1094
        // batch of adds presented to the sphinx router does not ever change.
1095
        if !fwdPkg.AckFilter.IsFull() {
6✔
1096
                l.processRemoteAdds(fwdPkg)
3✔
1097

3✔
1098
                // If the link failed during processing the adds, we must
3✔
1099
                // return to ensure we won't attempted to update the state
3✔
1100
                // further.
3✔
1101
                if l.failed {
3✔
1102
                        return fmt.Errorf("link failed while " +
×
1103
                                "processing remote adds")
×
1104
                }
×
1105
        }
1106

1107
        return nil
3✔
1108
}
1109

1110
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1111
// removes those that can be discarded. It is safe to do this entirely in the
1112
// background, since all state is coordinated on disk. This also ensures the
1113
// link can continue to process messages and interleave database accesses.
1114
//
1115
// NOTE: This MUST be run as a goroutine.
UNCOV
1116
func (l *channelLink) fwdPkgGarbager() {
×
UNCOV
1117
        defer l.cg.WgDone()
×
UNCOV
1118

×
UNCOV
1119
        l.cfg.FwdPkgGCTicker.Resume()
×
UNCOV
1120
        defer l.cfg.FwdPkgGCTicker.Stop()
×
UNCOV
1121

×
UNCOV
1122
        if err := l.loadAndRemove(); err != nil {
×
1123
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
1124
        }
×
1125

UNCOV
1126
        for {
×
UNCOV
1127
                select {
×
1128
                case <-l.cfg.FwdPkgGCTicker.Ticks():
×
1129
                        if err := l.loadAndRemove(); err != nil {
×
1130
                                l.log.Warnf("unable to remove fwd pkgs: %v",
×
1131
                                        err)
×
1132
                                continue
×
1133
                        }
UNCOV
1134
                case <-l.cg.Done():
×
UNCOV
1135
                        return
×
1136
                }
1137
        }
1138
}
1139

1140
// loadAndRemove loads all the channels forwarding packages and determines if
1141
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1142
// a longer tick interval can be used.
UNCOV
1143
func (l *channelLink) loadAndRemove() error {
×
UNCOV
1144
        fwdPkgs, err := l.channel.LoadFwdPkgs()
×
UNCOV
1145
        if err != nil {
×
1146
                return err
×
1147
        }
×
1148

UNCOV
1149
        var removeHeights []uint64
×
UNCOV
1150
        for _, fwdPkg := range fwdPkgs {
×
UNCOV
1151
                if fwdPkg.State != channeldb.FwdStateCompleted {
×
UNCOV
1152
                        continue
×
1153
                }
1154

UNCOV
1155
                removeHeights = append(removeHeights, fwdPkg.Height)
×
1156
        }
1157

1158
        // If removeHeights is empty, return early so we don't use a db
1159
        // transaction.
UNCOV
1160
        if len(removeHeights) == 0 {
×
UNCOV
1161
                return nil
×
UNCOV
1162
        }
×
1163

UNCOV
1164
        return l.channel.RemoveFwdPkgs(removeHeights...)
×
1165
}
1166

1167
// handleChanSyncErr performs the error handling logic in the case where we
1168
// could not successfully syncChanStates with our channel peer.
1169
func (l *channelLink) handleChanSyncErr(err error) {
3✔
1170
        l.log.Warnf("error when syncing channel states: %v", err)
3✔
1171

3✔
1172
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
3✔
1173

3✔
1174
        switch {
3✔
1175
        case errors.Is(err, ErrLinkShuttingDown):
3✔
1176
                l.log.Debugf("unable to sync channel states, link is " +
3✔
1177
                        "shutting down")
3✔
1178
                return
3✔
1179

1180
        // We failed syncing the commit chains, probably because the remote has
1181
        // lost state. We should force close the channel.
1182
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
3✔
1183
                fallthrough
3✔
1184

1185
        // The remote sent us an invalid last commit secret, we should force
1186
        // close the channel.
1187
        // TODO(halseth): and permanently ban the peer?
1188
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
3✔
1189
                fallthrough
3✔
1190

1191
        // The remote sent us a commit point different from what they sent us
1192
        // before.
1193
        // TODO(halseth): ban peer?
1194
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
3✔
1195
                // We'll fail the link and tell the peer to force close the
3✔
1196
                // channel. Note that the database state is not updated here,
3✔
1197
                // but will be updated when the close transaction is ready to
3✔
1198
                // avoid that we go down before storing the transaction in the
3✔
1199
                // db.
3✔
1200
                l.failf(
3✔
1201
                        LinkFailureError{
3✔
1202
                                code:          ErrSyncError,
3✔
1203
                                FailureAction: LinkFailureForceClose,
3✔
1204
                        },
3✔
1205
                        "unable to synchronize channel states: %v", err,
3✔
1206
                )
3✔
1207

1208
        // We have lost state and cannot safely force close the channel. Fail
1209
        // the channel and wait for the remote to hopefully force close it. The
1210
        // remote has sent us its latest unrevoked commitment point, and we'll
1211
        // store it in the database, such that we can attempt to recover the
1212
        // funds if the remote force closes the channel.
1213
        case errors.As(err, &errDataLoss):
3✔
1214
                err := l.channel.MarkDataLoss(
3✔
1215
                        errDataLoss.CommitPoint,
3✔
1216
                )
3✔
1217
                if err != nil {
3✔
1218
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1219
                                err)
×
1220
                }
×
1221

1222
        // We determined the commit chains were not possible to sync. We
1223
        // cautiously fail the channel, but don't force close.
1224
        // TODO(halseth): can we safely force close in any cases where this
1225
        // error is returned?
1226
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1227
                if err := l.channel.MarkBorked(); err != nil {
×
1228
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1229
                }
×
1230

1231
        // Other, unspecified error.
1232
        default:
×
1233
        }
1234

1235
        l.failf(
3✔
1236
                LinkFailureError{
3✔
1237
                        code:          ErrRecoveryError,
3✔
1238
                        FailureAction: LinkFailureForceNone,
3✔
1239
                },
3✔
1240
                "unable to synchronize channel states: %v", err,
3✔
1241
        )
3✔
1242
}
1243

1244
// htlcManager is the primary goroutine which drives a channel's commitment
1245
// update state-machine in response to messages received via several channels.
1246
// This goroutine reads messages from the upstream (remote) peer, and also from
1247
// downstream channel managed by the channel link. In the event that an htlc
1248
// needs to be forwarded, then send-only forward handler is used which sends
1249
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1250
// all timeouts for any active HTLCs, manages the channel's revocation window,
1251
// and also the htlc trickle queue+timer for this active channels.
1252
//
1253
// NOTE: This MUST be run as a goroutine.
1254
func (l *channelLink) htlcManager(ctx context.Context) {
3✔
1255
        defer func() {
6✔
1256
                l.cfg.BatchTicker.Stop()
3✔
1257
                l.cg.WgDone()
3✔
1258
                l.log.Infof("exited")
3✔
1259
        }()
3✔
1260

1261
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
3✔
1262

3✔
1263
        // Notify any clients that the link is now in the switch via an
3✔
1264
        // ActiveLinkEvent. We'll also defer an inactive link notification for
3✔
1265
        // when the link exits to ensure that every active notification is
3✔
1266
        // matched by an inactive one.
3✔
1267
        l.cfg.NotifyActiveLink(l.ChannelPoint())
3✔
1268
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
3✔
1269

3✔
1270
        // If the link is not started for the first time, we need to take extra
3✔
1271
        // steps to resume its state.
3✔
1272
        err := l.resumeLink(ctx)
3✔
1273
        if err != nil {
6✔
1274
                l.log.Errorf("resuming link failed: %v", err)
3✔
1275
                return
3✔
1276
        }
3✔
1277

1278
        // Now that we've received both channel_ready and channel reestablish,
1279
        // we can go ahead and send the active channel notification. We'll also
1280
        // defer the inactive notification for when the link exits to ensure
1281
        // that every active notification is matched by an inactive one.
1282
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
3✔
1283
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
3✔
1284

3✔
1285
        for {
6✔
1286
                // We must always check if we failed at some point processing
3✔
1287
                // the last update before processing the next.
3✔
1288
                if l.failed {
6✔
1289
                        l.log.Errorf("link failed, exiting htlcManager")
3✔
1290
                        return
3✔
1291
                }
3✔
1292

1293
                // Pause or resume the batch ticker.
1294
                l.toggleBatchTicker()
3✔
1295

3✔
1296
                select {
3✔
1297
                // We have a new hook that needs to be run when we reach a clean
1298
                // channel state.
1299
                case hook := <-l.flushHooks.newTransients:
3✔
1300
                        if l.channel.IsChannelClean() {
6✔
1301
                                hook()
3✔
1302
                        } else {
6✔
1303
                                l.flushHooks.alloc(hook)
3✔
1304
                        }
3✔
1305

1306
                // We have a new hook that needs to be run when we have
1307
                // committed all of our updates.
1308
                case hook := <-l.outgoingCommitHooks.newTransients:
3✔
1309
                        if !l.channel.OweCommitment() {
6✔
1310
                                hook()
3✔
1311
                        } else {
3✔
1312
                                l.outgoingCommitHooks.alloc(hook)
×
1313
                        }
×
1314

1315
                // We have a new hook that needs to be run when our peer has
1316
                // committed all of their updates.
1317
                case hook := <-l.incomingCommitHooks.newTransients:
×
1318
                        if !l.channel.NeedCommitment() {
×
1319
                                hook()
×
1320
                        } else {
×
1321
                                l.incomingCommitHooks.alloc(hook)
×
1322
                        }
×
1323

1324
                // Our update fee timer has fired, so we'll check the network
1325
                // fee to see if we should adjust our commitment fee.
1326
                case <-l.updateFeeTimer.C:
×
1327
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
×
NEW
1328
                        err := l.handleUpdateFee(ctx)
×
1329
                        if err != nil {
×
NEW
1330
                                l.log.Errorf("failed to handle update fee: "+
×
NEW
1331
                                        "%v", err)
×
UNCOV
1332
                        }
×
1333

1334
                // The underlying channel has notified us of a unilateral close
1335
                // carried out by the remote peer. In the case of such an
1336
                // event, we'll wipe the channel state from the peer, and mark
1337
                // the contract as fully settled. Afterwards we can exit.
1338
                //
1339
                // TODO(roasbeef): add force closure? also breach?
1340
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
3✔
1341
                        l.log.Warnf("remote peer has closed on-chain")
3✔
1342

3✔
1343
                        // TODO(roasbeef): remove all together
3✔
1344
                        go func() {
6✔
1345
                                chanPoint := l.channel.ChannelPoint()
3✔
1346
                                l.cfg.Peer.WipeChannel(&chanPoint)
3✔
1347
                        }()
3✔
1348

1349
                        return
3✔
1350

1351
                case <-l.cfg.BatchTicker.Ticks():
3✔
1352
                        // Attempt to extend the remote commitment chain
3✔
1353
                        // including all the currently pending entries. If the
3✔
1354
                        // send was unsuccessful, then abandon the update,
3✔
1355
                        // waiting for the revocation window to open up.
3✔
1356
                        if !l.updateCommitTxOrFail(ctx) {
3✔
1357
                                return
×
1358
                        }
×
1359

1360
                case <-l.cfg.PendingCommitTicker.Ticks():
×
1361
                        l.failf(
×
1362
                                LinkFailureError{
×
1363
                                        code:          ErrRemoteUnresponsive,
×
1364
                                        FailureAction: LinkFailureDisconnect,
×
1365
                                },
×
1366
                                "unable to complete dance",
×
1367
                        )
×
1368
                        return
×
1369

1370
                // A message from the switch was just received. This indicates
1371
                // that the link is an intermediate hop in a multi-hop HTLC
1372
                // circuit.
1373
                case pkt := <-l.downstream:
3✔
1374
                        l.handleDownstreamPkt(ctx, pkt)
3✔
1375

1376
                // A message from the connected peer was just received. This
1377
                // indicates that we have a new incoming HTLC, either directly
1378
                // for us, or part of a multi-hop HTLC circuit.
1379
                case msg := <-l.upstream:
3✔
1380
                        l.handleUpstreamMsg(ctx, msg)
3✔
1381

1382
                // A htlc resolution is received. This means that we now have a
1383
                // resolution for a previously accepted htlc.
1384
                case hodlItem := <-l.hodlQueue.ChanOut():
3✔
1385
                        err := l.handleHtlcResolution(ctx, hodlItem)
3✔
1386
                        if err != nil {
3✔
NEW
1387
                                l.log.Errorf("failed to handle htlc "+
×
NEW
1388
                                        "resolution: %v", err)
×
UNCOV
1389
                        }
×
1390

1391
                // A user-initiated quiescence request is received. We now
1392
                // forward it to the quiescer.
1393
                case qReq := <-l.quiescenceReqs:
3✔
1394
                        err := l.handleQuiescenceReq(qReq)
3✔
1395
                        if err != nil {
3✔
NEW
1396
                                l.log.Errorf("failed handle quiescence "+
×
NEW
1397
                                        "req: %v", err)
×
UNCOV
1398
                        }
×
1399

1400
                case <-l.cg.Done():
3✔
1401
                        return
3✔
1402
                }
1403
        }
1404
}
1405

1406
// processHodlQueue processes a received htlc resolution and continues reading
1407
// from the hodl queue until no more resolutions remain. When this function
1408
// returns without an error, the commit tx should be updated.
1409
func (l *channelLink) processHodlQueue(ctx context.Context,
1410
        firstResolution invoices.HtlcResolution) error {
3✔
1411

3✔
1412
        // Try to read all waiting resolution messages, so that they can all be
3✔
1413
        // processed in a single commitment tx update.
3✔
1414
        htlcResolution := firstResolution
3✔
1415
loop:
3✔
1416
        for {
6✔
1417
                // Lookup all hodl htlcs that can be failed or settled with this event.
3✔
1418
                // The hodl htlc must be present in the map.
3✔
1419
                circuitKey := htlcResolution.CircuitKey()
3✔
1420
                hodlHtlc, ok := l.hodlMap[circuitKey]
3✔
1421
                if !ok {
3✔
1422
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1423
                }
×
1424

1425
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
3✔
1426
                        return err
×
1427
                }
×
1428

1429
                // Clean up hodl map.
1430
                delete(l.hodlMap, circuitKey)
3✔
1431

3✔
1432
                select {
3✔
1433
                case item := <-l.hodlQueue.ChanOut():
3✔
1434
                        htlcResolution = item.(invoices.HtlcResolution)
3✔
1435

1436
                // No need to process it if the link is broken.
1437
                case <-l.cg.Done():
×
1438
                        return ErrLinkShuttingDown
×
1439

1440
                default:
3✔
1441
                        break loop
3✔
1442
                }
1443
        }
1444

1445
        // Update the commitment tx.
1446
        if err := l.updateCommitTx(ctx); err != nil {
3✔
1447
                return err
×
1448
        }
×
1449

1450
        return nil
3✔
1451
}
1452

1453
// processHtlcResolution applies a received htlc resolution to the provided
1454
// htlc. When this function returns without an error, the commit tx should be
1455
// updated.
1456
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1457
        htlc hodlHtlc) error {
3✔
1458

3✔
1459
        circuitKey := resolution.CircuitKey()
3✔
1460

3✔
1461
        // Determine required action for the resolution based on the type of
3✔
1462
        // resolution we have received.
3✔
1463
        switch res := resolution.(type) {
3✔
1464
        // Settle htlcs that returned a settle resolution using the preimage
1465
        // in the resolution.
1466
        case *invoices.HtlcSettleResolution:
3✔
1467
                l.log.Debugf("received settle resolution for %v "+
3✔
1468
                        "with outcome: %v", circuitKey, res.Outcome)
3✔
1469

3✔
1470
                return l.settleHTLC(
3✔
1471
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
3✔
1472
                )
3✔
1473

1474
        // For htlc failures, we get the relevant failure message based
1475
        // on the failure resolution and then fail the htlc.
1476
        case *invoices.HtlcFailResolution:
3✔
1477
                l.log.Debugf("received cancel resolution for "+
3✔
1478
                        "%v with outcome: %v", circuitKey, res.Outcome)
3✔
1479

3✔
1480
                // Get the lnwire failure message based on the resolution
3✔
1481
                // result.
3✔
1482
                failure := getResolutionFailure(res, htlc.add.Amount)
3✔
1483

3✔
1484
                l.sendHTLCError(
3✔
1485
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
3✔
1486
                        true,
3✔
1487
                )
3✔
1488
                return nil
3✔
1489

1490
        // Fail if we do not get a settle of fail resolution, since we
1491
        // are only expecting to handle settles and fails.
1492
        default:
×
1493
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1494
                        resolution)
×
1495
        }
1496
}
1497

1498
// getResolutionFailure returns the wire message that a htlc resolution should
1499
// be failed with.
1500
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1501
        amount lnwire.MilliSatoshi) *LinkError {
3✔
1502

3✔
1503
        // If the resolution has been resolved as part of a MPP timeout,
3✔
1504
        // we need to fail the htlc with lnwire.FailMppTimeout.
3✔
1505
        if resolution.Outcome == invoices.ResultMppTimeout {
3✔
1506
                return NewDetailedLinkError(
×
1507
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1508
                )
×
1509
        }
×
1510

1511
        // If the htlc is not a MPP timeout, we fail it with
1512
        // FailIncorrectDetails. This error is sent for invoice payment
1513
        // failures such as underpayment/ expiry too soon and hodl invoices
1514
        // (which return FailIncorrectDetails to avoid leaking information).
1515
        incorrectDetails := lnwire.NewFailIncorrectDetails(
3✔
1516
                amount, uint32(resolution.AcceptHeight),
3✔
1517
        )
3✔
1518

3✔
1519
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
3✔
1520
}
1521

1522
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1523
// within the link's configuration that will be used to determine when the link
1524
// should propose an update to its commitment fee rate.
1525
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
3✔
1526
        lower := int64(l.cfg.MinUpdateTimeout)
3✔
1527
        upper := int64(l.cfg.MaxUpdateTimeout)
3✔
1528
        return time.Duration(prand.Int63n(upper-lower) + lower)
3✔
1529
}
3✔
1530

1531
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1532
// downstream HTLC Switch.
1533
func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context,
1534
        pkt *htlcPacket) error {
3✔
1535

3✔
1536
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
3✔
1537
        if !ok {
3✔
1538
                return errors.New("not an UpdateAddHTLC packet")
×
1539
        }
×
1540

1541
        // If we are flushing the link in the outgoing direction or we have
1542
        // already sent Stfu, then we can't add new htlcs to the link and we
1543
        // need to bounce it.
1544
        if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
3✔
1545
                l.mailBox.FailAdd(pkt)
×
1546

×
1547
                return NewDetailedLinkError(
×
1548
                        &lnwire.FailTemporaryChannelFailure{},
×
1549
                        OutgoingFailureLinkNotEligible,
×
1550
                )
×
1551
        }
×
1552

1553
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1554
        // arbitrary delays between the switch adding an ADD to the
1555
        // mailbox, and the HTLC being added to the commitment state.
1556
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
3✔
1557
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1558
                l.mailBox.AckPacket(pkt.inKey())
×
1559
                return nil
×
1560
        }
×
1561

1562
        // Check if we can add the HTLC here without exceededing the max fee
1563
        // exposure threshold.
1564
        if l.isOverexposedWithHtlc(htlc, false) {
3✔
1565
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
×
1566
                        "exposure exceeded")
×
1567

×
1568
                l.mailBox.FailAdd(pkt)
×
1569

×
1570
                return NewDetailedLinkError(
×
1571
                        lnwire.NewTemporaryChannelFailure(nil),
×
1572
                        OutgoingFailureDownstreamHtlcAdd,
×
1573
                )
×
1574
        }
×
1575

1576
        // A new payment has been initiated via the downstream channel,
1577
        // so we add the new HTLC to our local log, then update the
1578
        // commitment chains.
1579
        htlc.ChanID = l.ChanID()
3✔
1580
        openCircuitRef := pkt.inKey()
3✔
1581

3✔
1582
        // We enforce the fee buffer for the commitment transaction because
3✔
1583
        // we are in control of adding this htlc. Nothing has locked-in yet so
3✔
1584
        // we can securely enforce the fee buffer which is only relevant if we
3✔
1585
        // are the initiator of the channel.
3✔
1586
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
3✔
1587
        if err != nil {
6✔
1588
                // The HTLC was unable to be added to the state machine,
3✔
1589
                // as a result, we'll signal the switch to cancel the
3✔
1590
                // pending payment.
3✔
1591
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
3✔
1592
                        err)
3✔
1593

3✔
1594
                // Remove this packet from the link's mailbox, this
3✔
1595
                // prevents it from being reprocessed if the link
3✔
1596
                // restarts and resets it mailbox. If this response
3✔
1597
                // doesn't make it back to the originating link, it will
3✔
1598
                // be rejected upon attempting to reforward the Add to
3✔
1599
                // the switch, since the circuit was never fully opened,
3✔
1600
                // and the forwarding package shows it as
3✔
1601
                // unacknowledged.
3✔
1602
                l.mailBox.FailAdd(pkt)
3✔
1603

3✔
1604
                return NewDetailedLinkError(
3✔
1605
                        lnwire.NewTemporaryChannelFailure(nil),
3✔
1606
                        OutgoingFailureDownstreamHtlcAdd,
3✔
1607
                )
3✔
1608
        }
3✔
1609

1610
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
3✔
1611
                "local_log_index=%v, pend_updates=%v",
3✔
1612
                htlc.PaymentHash[:], index,
3✔
1613
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
3✔
1614

3✔
1615
        pkt.outgoingChanID = l.ShortChanID()
3✔
1616
        pkt.outgoingHTLCID = index
3✔
1617
        htlc.ID = index
3✔
1618

3✔
1619
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
3✔
1620
                pkt.inKey(), pkt.outKey())
3✔
1621

3✔
1622
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
3✔
1623
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
3✔
1624

3✔
1625
        err = l.cfg.Peer.SendMessage(false, htlc)
3✔
1626
        if err != nil {
3✔
NEW
1627
                l.log.Errorf("failed to send UpdateAddHTLC: %v", err)
×
NEW
1628
        }
×
1629

1630
        // Send a forward event notification to htlcNotifier.
1631
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
3✔
1632
                newHtlcKey(pkt),
3✔
1633
                HtlcInfo{
3✔
1634
                        IncomingTimeLock: pkt.incomingTimeout,
3✔
1635
                        IncomingAmt:      pkt.incomingAmount,
3✔
1636
                        OutgoingTimeLock: htlc.Expiry,
3✔
1637
                        OutgoingAmt:      htlc.Amount,
3✔
1638
                },
3✔
1639
                getEventType(pkt),
3✔
1640
        )
3✔
1641

3✔
1642
        l.tryBatchUpdateCommitTx(ctx)
3✔
1643

3✔
1644
        return nil
3✔
1645
}
1646

1647
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1648
// Switch. Possible messages sent by the switch include requests to forward new
1649
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1650
// cleared HTLCs with the upstream peer.
1651
//
1652
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1653
func (l *channelLink) handleDownstreamPkt(ctx context.Context,
1654
        pkt *htlcPacket) {
3✔
1655

3✔
1656
        if pkt.htlc.MsgType().IsChannelUpdate() &&
3✔
1657
                !l.quiescer.CanSendUpdates() {
3✔
1658

×
1659
                l.log.Warnf("unable to process channel update. "+
×
1660
                        "ChannelID=%v is quiescent.", l.ChanID)
×
1661

×
1662
                return
×
1663
        }
×
1664

1665
        switch htlc := pkt.htlc.(type) {
3✔
1666
        case *lnwire.UpdateAddHTLC:
3✔
1667
                // Handle add message. The returned error can be ignored,
3✔
1668
                // because it is also sent through the mailbox.
3✔
1669
                _ = l.handleDownstreamUpdateAdd(ctx, pkt)
3✔
1670

1671
        case *lnwire.UpdateFulfillHTLC:
3✔
1672
                l.processLocalUpdateFulfillHTLC(ctx, pkt, htlc)
3✔
1673

1674
        case *lnwire.UpdateFailHTLC:
3✔
1675
                l.processLocalUpdateFailHTLC(ctx, pkt, htlc)
3✔
1676
        }
1677
}
1678

1679
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1680
// full.
1681
func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) {
3✔
1682
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
3✔
1683
        if pending < uint64(l.cfg.BatchSize) {
6✔
1684
                return
3✔
1685
        }
3✔
1686

1687
        l.updateCommitTxOrFail(ctx)
3✔
1688
}
1689

1690
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1691
// associated with this packet. If successful in doing so, it will also purge
1692
// the open circuit from the circuit map and remove the packet from the link's
1693
// mailbox.
1694
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
×
1695
        inKey := pkt.inKey()
×
1696

×
1697
        l.log.Debugf("cleaning up spurious response for incoming "+
×
1698
                "circuit-key=%v", inKey)
×
1699

×
1700
        // If the htlc packet doesn't have a source reference, it is unsafe to
×
1701
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
×
1702
        if pkt.sourceRef == nil {
×
1703
                l.log.Errorf("unable to cleanup response for incoming "+
×
1704
                        "circuit-key=%v, does not contain source reference",
×
1705
                        inKey)
×
1706
                return
×
1707
        }
×
1708

1709
        // If the source reference is present,  we will try to prevent this link
1710
        // from resending the packet to the switch. To do so, we ack the AddRef
1711
        // of the incoming HTLC belonging to this link.
1712
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
×
1713
        if err != nil {
×
1714
                l.log.Errorf("unable to ack AddRef for incoming "+
×
1715
                        "circuit-key=%v: %v", inKey, err)
×
1716

×
1717
                // If this operation failed, it is unsafe to attempt removal of
×
1718
                // the destination reference or circuit, so we exit early. The
×
1719
                // cleanup may proceed with a different packet in the future
×
1720
                // that succeeds on this step.
×
1721
                return
×
1722
        }
×
1723

1724
        // Now that we know this link will stop retransmitting Adds to the
1725
        // switch, we can begin to teardown the response reference and circuit
1726
        // map.
1727
        //
1728
        // If the packet includes a destination reference, then a response for
1729
        // this HTLC was locked into the outgoing channel. Attempt to remove
1730
        // this reference, so we stop retransmitting the response internally.
1731
        // Even if this fails, we will proceed in trying to delete the circuit.
1732
        // When retransmitting responses, the destination references will be
1733
        // cleaned up if an open circuit is not found in the circuit map.
1734
        if pkt.destRef != nil {
×
1735
                err := l.channel.AckSettleFails(*pkt.destRef)
×
1736
                if err != nil {
×
1737
                        l.log.Errorf("unable to ack SettleFailRef "+
×
1738
                                "for incoming circuit-key=%v: %v",
×
1739
                                inKey, err)
×
1740
                }
×
1741
        }
1742

1743
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
×
1744

×
1745
        // With all known references acked, we can now safely delete the circuit
×
1746
        // from the switch's circuit map, as the state is no longer needed.
×
1747
        err = l.cfg.Circuits.DeleteCircuits(inKey)
×
1748
        if err != nil {
×
1749
                l.log.Errorf("unable to delete circuit for "+
×
1750
                        "circuit-key=%v: %v", inKey, err)
×
1751
        }
×
1752
}
1753

1754
// handleUpstreamMsg processes wire messages related to commitment state
1755
// updates from the upstream peer. The upstream peer is the peer whom we have a
1756
// direct channel with, updating our respective commitment chains.
1757
func (l *channelLink) handleUpstreamMsg(ctx context.Context,
1758
        msg lnwire.Message) {
3✔
1759

3✔
1760
        l.log.Tracef("receive upstream msg %v, handling now... ", msg.MsgType())
3✔
1761
        defer l.log.Tracef("handled upstream msg %v", msg.MsgType())
3✔
1762

3✔
1763
        // First check if the message is an update and we are capable of
3✔
1764
        // receiving updates right now.
3✔
1765
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
3✔
1766
                l.stfuFailf("update received after stfu: %T", msg)
×
1767
                return
×
1768
        }
×
1769

1770
        var err error
3✔
1771

3✔
1772
        switch msg := msg.(type) {
3✔
1773
        case *lnwire.UpdateAddHTLC:
3✔
1774
                err = l.processRemoteUpdateAddHTLC(msg)
3✔
1775

1776
        case *lnwire.UpdateFulfillHTLC:
3✔
1777
                err = l.processRemoteUpdateFulfillHTLC(msg)
3✔
1778

1779
        case *lnwire.UpdateFailMalformedHTLC:
3✔
1780
                err = l.processRemoteUpdateFailMalformedHTLC(msg)
3✔
1781

1782
        case *lnwire.UpdateFailHTLC:
3✔
1783
                err = l.processRemoteUpdateFailHTLC(msg)
3✔
1784

1785
        case *lnwire.CommitSig:
3✔
1786
                err = l.processRemoteCommitSig(ctx, msg)
3✔
1787

1788
        case *lnwire.RevokeAndAck:
3✔
1789
                err = l.processRemoteRevokeAndAck(ctx, msg)
3✔
1790

NEW
1791
        case *lnwire.UpdateFee:
×
NEW
1792
                l.processRemoteUpdateFee(msg)
×
1793

1794
        case *lnwire.Stfu:
3✔
1795
                err = l.handleStfu(msg)
3✔
1796
                if err != nil {
3✔
NEW
1797
                        l.stfuFailf("handleStfu: %v", err.Error())
×
1798
                }
×
1799

1800
        // In the case where we receive a warning message from our peer, just
1801
        // log it and move on. We choose not to disconnect from our peer,
1802
        // although we "MAY" do so according to the specification.
NEW
1803
        case *lnwire.Warning:
×
NEW
1804
                l.log.Warnf("received warning message from peer: %v",
×
NEW
1805
                        msg.Warning())
×
1806

1807
        case *lnwire.Error:
2✔
1808
                l.processRemoteError(msg)
2✔
1809

NEW
1810
        default:
×
NEW
1811
                l.log.Warnf("received unknown message of type %T", msg)
×
1812
        }
1813

1814
        if err != nil {
6✔
1815
                l.log.Errorf("failed to process remote %v: %v", msg.MsgType(),
3✔
1816
                        err)
3✔
1817
        }
3✔
1818
}
1819

1820
// handleStfu implements the top-level logic for handling the Stfu message from
1821
// our peer.
1822
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
3✔
1823
        if !l.noDanglingUpdates(lntypes.Remote) {
3✔
NEW
1824
                return ErrPendingRemoteUpdates
×
NEW
1825
        }
×
1826
        err := l.quiescer.RecvStfu(*stfu)
3✔
1827
        if err != nil {
3✔
NEW
1828
                return err
×
NEW
1829
        }
×
1830

1831
        // If we can immediately send an Stfu response back, we will.
1832
        if l.noDanglingUpdates(lntypes.Local) {
6✔
1833
                return l.quiescer.SendOwedStfu()
3✔
1834
        }
3✔
1835

NEW
1836
        return nil
×
1837
}
1838

1839
// stfuFailf fails the link in the case where the requirements of the quiescence
1840
// protocol are violated. In all cases we opt to drop the connection as only
1841
// link state (as opposed to channel state) is affected.
NEW
1842
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
NEW
1843
        l.failf(LinkFailureError{
×
NEW
1844
                code:             ErrStfuViolation,
×
NEW
1845
                FailureAction:    LinkFailureDisconnect,
×
NEW
1846
                PermanentFailure: false,
×
NEW
1847
                Warning:          true,
×
NEW
1848
        }, format, args...)
×
NEW
1849
}
×
1850

1851
// noDanglingUpdates returns true when there are 0 updates that were originally
1852
// issued by whose on either the Local or Remote commitment transaction.
1853
func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
3✔
1854
        pendingOnLocal := l.channel.NumPendingUpdates(
3✔
1855
                whose, lntypes.Local,
3✔
1856
        )
3✔
1857
        pendingOnRemote := l.channel.NumPendingUpdates(
3✔
1858
                whose, lntypes.Remote,
3✔
1859
        )
3✔
1860

3✔
1861
        return pendingOnLocal == 0 && pendingOnRemote == 0
3✔
1862
}
3✔
1863

1864
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
1865
// for packets delivered from server, and cleaning up any circuits closed by
1866
// signing a previous commitment txn. This method ensures that the circuits are
1867
// removed from the circuit map before removing them from the link's mailbox,
1868
// otherwise it could be possible for some circuit to be missed if this link
1869
// flaps.
1870
func (l *channelLink) ackDownStreamPackets() error {
3✔
1871
        // First, remove the downstream Add packets that were included in the
3✔
1872
        // previous commitment signature. This will prevent the Adds from being
3✔
1873
        // replayed if this link disconnects.
3✔
1874
        for _, inKey := range l.openedCircuits {
6✔
1875
                // In order to test the sphinx replay logic of the remote
3✔
1876
                // party, unsafe replay does not acknowledge the packets from
3✔
1877
                // the mailbox. We can then force a replay of any Add packets
3✔
1878
                // held in memory by disconnecting and reconnecting the link.
3✔
1879
                if l.cfg.UnsafeReplay {
6✔
1880
                        continue
3✔
1881
                }
1882

1883
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
3✔
1884
                l.mailBox.AckPacket(inKey)
3✔
1885
        }
1886

1887
        // Now, we will delete all circuits closed by the previous commitment
1888
        // signature, which is the result of downstream Settle/Fail packets. We
1889
        // batch them here to ensure circuits are closed atomically and for
1890
        // performance.
1891
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
3✔
1892
        switch err {
3✔
1893
        case nil:
3✔
1894
                // Successful deletion.
1895

1896
        default:
×
1897
                l.log.Errorf("unable to delete %d circuits: %v",
×
1898
                        len(l.closedCircuits), err)
×
1899
                return err
×
1900
        }
1901

1902
        // With the circuits removed from memory and disk, we now ack any
1903
        // Settle/Fails in the mailbox to ensure they do not get redelivered
1904
        // after startup. If forgive is enabled and we've reached this point,
1905
        // the circuits must have been removed at some point, so it is now safe
1906
        // to un-queue the corresponding Settle/Fails.
1907
        for _, inKey := range l.closedCircuits {
6✔
1908
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
3✔
1909
                        inKey)
3✔
1910
                l.mailBox.AckPacket(inKey)
3✔
1911
        }
3✔
1912

1913
        // Lastly, reset our buffers to be empty while keeping any acquired
1914
        // growth in the backing array.
1915
        l.openedCircuits = l.openedCircuits[:0]
3✔
1916
        l.closedCircuits = l.closedCircuits[:0]
3✔
1917

3✔
1918
        return nil
3✔
1919
}
1920

1921
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
1922
// the link.
1923
func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool {
3✔
1924
        err := l.updateCommitTx(ctx)
3✔
1925
        switch {
3✔
1926
        // No error encountered, success.
1927
        case err == nil:
3✔
1928

1929
        // A duplicate keystone error should be resolved and is not fatal, so
1930
        // we won't send an Error message to the peer.
NEW
1931
        case errors.Is(err, ErrDuplicateKeystone):
×
1932
                l.failf(LinkFailureError{code: ErrCircuitError},
×
1933
                        "temporary circuit error: %v", err)
×
1934
                return false
×
1935

1936
        // Any other error is treated results in an Error message being sent to
1937
        // the peer.
1938
        default:
×
1939
                l.failf(LinkFailureError{code: ErrInternalError},
×
1940
                        "unable to update commitment: %v", err)
×
1941
                return false
×
1942
        }
1943

1944
        return true
3✔
1945
}
1946

1947
// updateCommitTx signs, then sends an update to the remote peer adding a new
1948
// commitment to their commitment chain which includes all the latest updates
1949
// we've received+processed up to this point.
1950
func (l *channelLink) updateCommitTx(ctx context.Context) error {
3✔
1951
        // Preemptively write all pending keystones to disk, just in case the
3✔
1952
        // HTLCs we have in memory are included in the subsequent attempt to
3✔
1953
        // sign a commitment state.
3✔
1954
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
3✔
1955
        if err != nil {
3✔
1956
                // If ErrDuplicateKeystone is returned, the caller will catch
×
1957
                // it.
×
1958
                return err
×
1959
        }
×
1960

1961
        // Reset the batch, but keep the backing buffer to avoid reallocating.
1962
        l.keystoneBatch = l.keystoneBatch[:0]
3✔
1963

3✔
1964
        // If hodl.Commit mode is active, we will refrain from attempting to
3✔
1965
        // commit any in-memory modifications to the channel state. Exiting here
3✔
1966
        // permits testing of either the switch or link's ability to trim
3✔
1967
        // circuits that have been opened, but unsuccessfully committed.
3✔
1968
        if l.cfg.HodlMask.Active(hodl.Commit) {
6✔
1969
                l.log.Warnf(hodl.Commit.Warning())
3✔
1970
                return nil
3✔
1971
        }
3✔
1972

1973
        ctx, done := l.cg.Create(ctx)
3✔
1974
        defer done()
3✔
1975

3✔
1976
        newCommit, err := l.channel.SignNextCommitment(ctx)
3✔
1977
        if err == lnwallet.ErrNoWindow {
6✔
1978
                l.cfg.PendingCommitTicker.Resume()
3✔
1979
                l.log.Trace("PendingCommitTicker resumed")
3✔
1980

3✔
1981
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
3✔
1982
                l.log.Tracef("revocation window exhausted, unable to send: "+
3✔
1983
                        "%v, pend_updates=%v, dangling_closes%v", n,
3✔
1984
                        lnutils.SpewLogClosure(l.openedCircuits),
3✔
1985
                        lnutils.SpewLogClosure(l.closedCircuits))
3✔
1986

3✔
1987
                return nil
3✔
1988
        } else if err != nil {
6✔
1989
                return err
×
1990
        }
×
1991

1992
        if err := l.ackDownStreamPackets(); err != nil {
3✔
1993
                return err
×
1994
        }
×
1995

1996
        l.cfg.PendingCommitTicker.Pause()
3✔
1997
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
3✔
1998

3✔
1999
        // The remote party now has a new pending commitment, so we'll update
3✔
2000
        // the contract court to be aware of this new set (the prior old remote
3✔
2001
        // pending).
3✔
2002
        newUpdate := &contractcourt.ContractUpdate{
3✔
2003
                HtlcKey: contractcourt.RemotePendingHtlcSet,
3✔
2004
                Htlcs:   newCommit.PendingHTLCs,
3✔
2005
        }
3✔
2006
        err = l.cfg.NotifyContractUpdate(newUpdate)
3✔
2007
        if err != nil {
3✔
2008
                l.log.Errorf("unable to notify contract update: %v", err)
×
2009
                return err
×
2010
        }
×
2011

2012
        select {
3✔
2013
        case <-l.cg.Done():
×
2014
                return ErrLinkShuttingDown
×
2015
        default:
3✔
2016
        }
2017

2018
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
3✔
2019
        if err != nil {
3✔
2020
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2021
        }
×
2022

2023
        commitSig := &lnwire.CommitSig{
3✔
2024
                ChanID:        l.ChanID(),
3✔
2025
                CommitSig:     newCommit.CommitSig,
3✔
2026
                HtlcSigs:      newCommit.HtlcSigs,
3✔
2027
                PartialSig:    newCommit.PartialSig,
3✔
2028
                CustomRecords: auxBlobRecords,
3✔
2029
        }
3✔
2030
        err = l.cfg.Peer.SendMessage(false, commitSig)
3✔
2031
        if err != nil {
3✔
NEW
2032
                l.log.Errorf("failed to send CommitSig: %v", err)
×
NEW
2033
        }
×
2034

2035
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
2036
        // of commit hooks.
2037
        l.RWMutex.Lock()
3✔
2038
        l.outgoingCommitHooks.invoke()
3✔
2039
        l.RWMutex.Unlock()
3✔
2040

3✔
2041
        return nil
3✔
2042
}
2043

2044
// Peer returns the representation of remote peer with which we have the
2045
// channel link opened.
2046
//
2047
// NOTE: Part of the ChannelLink interface.
2048
func (l *channelLink) PeerPubKey() [33]byte {
3✔
2049
        return l.cfg.Peer.PubKey()
3✔
2050
}
3✔
2051

2052
// ChannelPoint returns the channel outpoint for the channel link.
2053
// NOTE: Part of the ChannelLink interface.
2054
func (l *channelLink) ChannelPoint() wire.OutPoint {
3✔
2055
        return l.channel.ChannelPoint()
3✔
2056
}
3✔
2057

2058
// ShortChanID returns the short channel ID for the channel link. The short
2059
// channel ID encodes the exact location in the main chain that the original
2060
// funding output can be found.
2061
//
2062
// NOTE: Part of the ChannelLink interface.
2063
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
3✔
2064
        l.RLock()
3✔
2065
        defer l.RUnlock()
3✔
2066

3✔
2067
        return l.channel.ShortChanID()
3✔
2068
}
3✔
2069

2070
// UpdateShortChanID updates the short channel ID for a link. This may be
2071
// required in the event that a link is created before the short chan ID for it
2072
// is known, or a re-org occurs, and the funding transaction changes location
2073
// within the chain.
2074
//
2075
// NOTE: Part of the ChannelLink interface.
2076
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
3✔
2077
        chanID := l.ChanID()
3✔
2078

3✔
2079
        // Refresh the channel state's short channel ID by loading it from disk.
3✔
2080
        // This ensures that the channel state accurately reflects the updated
3✔
2081
        // short channel ID.
3✔
2082
        err := l.channel.State().Refresh()
3✔
2083
        if err != nil {
3✔
2084
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2085
                        "%v", chanID, err)
×
2086
                return hop.Source, err
×
2087
        }
×
2088

2089
        return hop.Source, nil
3✔
2090
}
2091

2092
// ChanID returns the channel ID for the channel link. The channel ID is a more
2093
// compact representation of a channel's full outpoint.
2094
//
2095
// NOTE: Part of the ChannelLink interface.
2096
func (l *channelLink) ChanID() lnwire.ChannelID {
3✔
2097
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
3✔
2098
}
3✔
2099

2100
// Bandwidth returns the total amount that can flow through the channel link at
2101
// this given instance. The value returned is expressed in millisatoshi and can
2102
// be used by callers when making forwarding decisions to determine if a link
2103
// can accept an HTLC.
2104
//
2105
// NOTE: Part of the ChannelLink interface.
2106
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
3✔
2107
        // Get the balance available on the channel for new HTLCs. This takes
3✔
2108
        // the channel reserve into account so HTLCs up to this value won't
3✔
2109
        // violate it.
3✔
2110
        return l.channel.AvailableBalance()
3✔
2111
}
3✔
2112

2113
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2114
// amount provided to the link. This check does not reserve a space, since
2115
// forwards or other payments may use the available slot, so it should be
2116
// considered best-effort.
2117
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
3✔
2118
        return l.channel.MayAddOutgoingHtlc(amt)
3✔
2119
}
3✔
2120

2121
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2122
// method.
2123
//
2124
// NOTE: Part of the dustHandler interface.
2125
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2126
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
3✔
2127

3✔
2128
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
3✔
2129
}
3✔
2130

2131
// getFeeRate is a wrapper method that retrieves the underlying channel's
2132
// feerate.
2133
//
2134
// NOTE: Part of the dustHandler interface.
2135
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
3✔
2136
        return l.channel.CommitFeeRate()
3✔
2137
}
3✔
2138

2139
// getDustClosure returns a closure that can be used by the switch or mailbox
2140
// to evaluate whether a given HTLC is dust.
2141
//
2142
// NOTE: Part of the dustHandler interface.
2143
func (l *channelLink) getDustClosure() dustClosure {
3✔
2144
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
3✔
2145
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
3✔
2146
        chanType := l.channel.State().ChanType
3✔
2147

3✔
2148
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
3✔
2149
}
3✔
2150

2151
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2152
// is used so that the Switch can have access to the commitment fee without
2153
// needing to have a *LightningChannel. This doesn't include dust.
2154
//
2155
// NOTE: Part of the dustHandler interface.
2156
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
3✔
2157
        if remote {
6✔
2158
                return l.channel.State().RemoteCommitment.CommitFee
3✔
2159
        }
3✔
2160

2161
        return l.channel.State().LocalCommitment.CommitFee
3✔
2162
}
2163

2164
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2165
// increases the total dust and fees within the channel past the configured
2166
// fee threshold. It first calculates the dust sum over every update in the
2167
// update log with the proposed fee-rate and taking into account both the local
2168
// and remote dust limits. It uses every update in the update log instead of
2169
// what is actually on the local and remote commitments because it is assumed
2170
// that in a worst-case scenario, every update in the update log could
2171
// theoretically be on either commitment transaction and this needs to be
2172
// accounted for with this fee-rate. It then calculates the local and remote
2173
// commitment fees given the proposed fee-rate. Finally, it tallies the results
2174
// and determines if the fee threshold has been exceeded.
2175
func (l *channelLink) exceedsFeeExposureLimit(
2176
        feePerKw chainfee.SatPerKWeight) (bool, error) {
×
2177

×
2178
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
×
2179

×
2180
        // Get the sum of dust for both the local and remote commitments using
×
2181
        // this "dry-run" fee.
×
2182
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
×
2183
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
×
2184

×
2185
        // Calculate the local and remote commitment fees using this dry-run
×
2186
        // fee.
×
2187
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
×
2188
        if err != nil {
×
2189
                return false, err
×
2190
        }
×
2191

2192
        // Finally, check whether the max fee exposure was exceeded on either
2193
        // future commitment transaction with the fee-rate.
2194
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
×
2195
        if totalLocalDust > l.cfg.MaxFeeExposure {
×
2196
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
2197
                        "local dust: %v, local fee: %v", l.ShortChanID(),
×
2198
                        totalLocalDust, localFee)
×
2199

×
2200
                return true, nil
×
2201
        }
×
2202

2203
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
×
2204
                remoteFee,
×
2205
        )
×
2206

×
2207
        if totalRemoteDust > l.cfg.MaxFeeExposure {
×
2208
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
2209
                        "remote dust: %v, remote fee: %v", l.ShortChanID(),
×
2210
                        totalRemoteDust, remoteFee)
×
2211

×
2212
                return true, nil
×
2213
        }
×
2214

2215
        return false, nil
×
2216
}
2217

2218
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
2219
// channel exceed the fee threshold. It first fetches the largest fee-rate that
2220
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
2221
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
2222
// the overall dust sum. If it is not dust, it contributes to weight, which
2223
// also adds to the overall dust sum by an increase in fees. If the dust sum on
2224
// either commitment exceeds the configured fee threshold, this function
2225
// returns true.
2226
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
2227
        incoming bool) bool {
3✔
2228

3✔
2229
        dustClosure := l.getDustClosure()
3✔
2230

3✔
2231
        feeRate := l.channel.WorstCaseFeeRate()
3✔
2232

3✔
2233
        amount := htlc.Amount.ToSatoshis()
3✔
2234

3✔
2235
        // See if this HTLC is dust on both the local and remote commitments.
3✔
2236
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
3✔
2237
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
3✔
2238

3✔
2239
        // Calculate the dust sum for the local and remote commitments.
3✔
2240
        localDustSum := l.getDustSum(
3✔
2241
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
3✔
2242
        )
3✔
2243
        remoteDustSum := l.getDustSum(
3✔
2244
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
3✔
2245
        )
3✔
2246

3✔
2247
        // Grab the larger of the local and remote commitment fees w/o dust.
3✔
2248
        commitFee := l.getCommitFee(false)
3✔
2249

3✔
2250
        if l.getCommitFee(true) > commitFee {
5✔
2251
                commitFee = l.getCommitFee(true)
2✔
2252
        }
2✔
2253

2254
        commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
3✔
2255

3✔
2256
        localDustSum += commitFeeMSat
3✔
2257
        remoteDustSum += commitFeeMSat
3✔
2258

3✔
2259
        // Calculate the additional fee increase if this is a non-dust HTLC.
3✔
2260
        weight := lntypes.WeightUnit(input.HTLCWeight)
3✔
2261
        additional := lnwire.NewMSatFromSatoshis(
3✔
2262
                feeRate.FeeForWeight(weight),
3✔
2263
        )
3✔
2264

3✔
2265
        if isLocalDust {
6✔
2266
                // If this is dust, it doesn't contribute to weight but does
3✔
2267
                // contribute to the overall dust sum.
3✔
2268
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
3✔
2269
        } else {
6✔
2270
                // Account for the fee increase that comes with an increase in
3✔
2271
                // weight.
3✔
2272
                localDustSum += additional
3✔
2273
        }
3✔
2274

2275
        if localDustSum > l.cfg.MaxFeeExposure {
3✔
2276
                // The max fee exposure was exceeded.
×
2277
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
2278
                        "overexposed, total local dust: %v (current commit "+
×
2279
                        "fee: %v)", l.ShortChanID(), htlc, localDustSum)
×
2280

×
2281
                return true
×
2282
        }
×
2283

2284
        if isRemoteDust {
6✔
2285
                // If this is dust, it doesn't contribute to weight but does
3✔
2286
                // contribute to the overall dust sum.
3✔
2287
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
3✔
2288
        } else {
6✔
2289
                // Account for the fee increase that comes with an increase in
3✔
2290
                // weight.
3✔
2291
                remoteDustSum += additional
3✔
2292
        }
3✔
2293

2294
        if remoteDustSum > l.cfg.MaxFeeExposure {
3✔
2295
                // The max fee exposure was exceeded.
×
2296
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
2297
                        "overexposed, total remote dust: %v (current commit "+
×
2298
                        "fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
×
2299

×
2300
                return true
×
2301
        }
×
2302

2303
        return false
3✔
2304
}
2305

2306
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
2307
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
2308
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
2309
// whether to evaluate on the local or remote commit, and finally an HTLC
2310
// amount to test.
2311
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
2312
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
2313

2314
// dustHelper is used to construct the dustClosure.
2315
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
2316
        remoteDustLimit btcutil.Amount) dustClosure {
3✔
2317

3✔
2318
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
3✔
2319
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
6✔
2320

3✔
2321
                var dustLimit btcutil.Amount
3✔
2322
                if whoseCommit.IsLocal() {
6✔
2323
                        dustLimit = localDustLimit
3✔
2324
                } else {
6✔
2325
                        dustLimit = remoteDustLimit
3✔
2326
                }
3✔
2327

2328
                return lnwallet.HtlcIsDust(
3✔
2329
                        chantype, incoming, whoseCommit, feerate, amt,
3✔
2330
                        dustLimit,
3✔
2331
                )
3✔
2332
        }
2333

2334
        return isDust
3✔
2335
}
2336

2337
// zeroConfConfirmed returns whether or not the zero-conf channel has
2338
// confirmed on-chain.
2339
//
2340
// Part of the scidAliasHandler interface.
2341
func (l *channelLink) zeroConfConfirmed() bool {
3✔
2342
        return l.channel.State().ZeroConfConfirmed()
3✔
2343
}
3✔
2344

2345
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
2346
// should not be called for non-zero-conf channels.
2347
//
2348
// Part of the scidAliasHandler interface.
2349
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
3✔
2350
        return l.channel.State().ZeroConfRealScid()
3✔
2351
}
3✔
2352

2353
// isZeroConf returns whether or not the underlying channel is a zero-conf
2354
// channel.
2355
//
2356
// Part of the scidAliasHandler interface.
2357
func (l *channelLink) isZeroConf() bool {
3✔
2358
        return l.channel.State().IsZeroConf()
3✔
2359
}
3✔
2360

2361
// negotiatedAliasFeature returns whether or not the underlying channel has
2362
// negotiated the option-scid-alias feature bit. This will be true for both
2363
// option-scid-alias and zero-conf channel-types. It will also be true for
2364
// channels with the feature bit but without the above channel-types.
2365
//
2366
// Part of the scidAliasFeature interface.
2367
func (l *channelLink) negotiatedAliasFeature() bool {
3✔
2368
        return l.channel.State().NegotiatedAliasFeature()
3✔
2369
}
3✔
2370

2371
// getAliases returns the set of aliases for the underlying channel.
2372
//
2373
// Part of the scidAliasHandler interface.
2374
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
3✔
2375
        return l.cfg.GetAliases(l.ShortChanID())
3✔
2376
}
3✔
2377

2378
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
2379
//
2380
// Part of the scidAliasHandler interface.
2381
func (l *channelLink) attachFailAliasUpdate(closure func(
2382
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
3✔
2383

3✔
2384
        l.Lock()
3✔
2385
        l.cfg.FailAliasUpdate = closure
3✔
2386
        l.Unlock()
3✔
2387
}
3✔
2388

2389
// AttachMailBox updates the current mailbox used by this link, and hooks up
2390
// the mailbox's message and packet outboxes to the link's upstream and
2391
// downstream chans, respectively.
2392
func (l *channelLink) AttachMailBox(mailbox MailBox) {
3✔
2393
        l.Lock()
3✔
2394
        l.mailBox = mailbox
3✔
2395
        l.upstream = mailbox.MessageOutBox()
3✔
2396
        l.downstream = mailbox.PacketOutBox()
3✔
2397
        l.Unlock()
3✔
2398

3✔
2399
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
3✔
2400
        // never committed.
3✔
2401
        l.mailBox.SetFeeRate(l.getFeeRate())
3✔
2402

3✔
2403
        // Also set the mailbox's dust closure so that it can query whether HTLC's
3✔
2404
        // are dust given the current feerate.
3✔
2405
        l.mailBox.SetDustClosure(l.getDustClosure())
3✔
2406
}
3✔
2407

2408
// UpdateForwardingPolicy updates the forwarding policy for the target
2409
// ChannelLink. Once updated, the link will use the new forwarding policy to
2410
// govern if it an incoming HTLC should be forwarded or not. We assume that
2411
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
2412
// update all of the link's FwrdingPolicy's values.
2413
//
2414
// NOTE: Part of the ChannelLink interface.
2415
func (l *channelLink) UpdateForwardingPolicy(
2416
        newPolicy models.ForwardingPolicy) {
3✔
2417

3✔
2418
        l.Lock()
3✔
2419
        defer l.Unlock()
3✔
2420

3✔
2421
        l.cfg.FwrdingPolicy = newPolicy
3✔
2422
}
3✔
2423

2424
// CheckHtlcForward should return a nil error if the passed HTLC details
2425
// satisfy the current forwarding policy fo the target link. Otherwise,
2426
// a LinkError with a valid protocol failure message should be returned
2427
// in order to signal to the source of the HTLC, the policy consistency
2428
// issue.
2429
//
2430
// NOTE: Part of the ChannelLink interface.
2431
func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
2432
        amtToForward lnwire.MilliSatoshi, incomingTimeout,
2433
        outgoingTimeout uint32, inboundFee models.InboundFee,
2434
        heightNow uint32, originalScid lnwire.ShortChannelID,
2435
        customRecords lnwire.CustomRecords) *LinkError {
3✔
2436

3✔
2437
        l.RLock()
3✔
2438
        policy := l.cfg.FwrdingPolicy
3✔
2439
        l.RUnlock()
3✔
2440

3✔
2441
        // Using the outgoing HTLC amount, we'll calculate the outgoing
3✔
2442
        // fee this incoming HTLC must carry in order to satisfy the constraints
3✔
2443
        // of the outgoing link.
3✔
2444
        outFee := ExpectedFee(policy, amtToForward)
3✔
2445

3✔
2446
        // Then calculate the inbound fee that we charge based on the sum of
3✔
2447
        // outgoing HTLC amount and outgoing fee.
3✔
2448
        inFee := inboundFee.CalcFee(amtToForward + outFee)
3✔
2449

3✔
2450
        // Add up both fee components. It is important to calculate both fees
3✔
2451
        // separately. An alternative way of calculating is to first determine
3✔
2452
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
3✔
2453
        // rounding may cause the result to be slightly higher than in the case
3✔
2454
        // of separately rounded fee components. This potentially causes failed
3✔
2455
        // forwards for senders and is something to be avoided.
3✔
2456
        expectedFee := inFee + int64(outFee)
3✔
2457

3✔
2458
        // If the actual fee is less than our expected fee, then we'll reject
3✔
2459
        // this HTLC as it didn't provide a sufficient amount of fees, or the
3✔
2460
        // values have been tampered with, or the send used incorrect/dated
3✔
2461
        // information to construct the forwarding information for this hop. In
3✔
2462
        // any case, we'll cancel this HTLC.
3✔
2463
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
3✔
2464
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
6✔
2465
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
3✔
2466
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
3✔
2467
                        "inboundFee=%v",
3✔
2468
                        payHash[:], expectedFee, actualFee,
3✔
2469
                        incomingHtlcAmt, amtToForward, inboundFee,
3✔
2470
                )
3✔
2471

3✔
2472
                // As part of the returned error, we'll send our latest routing
3✔
2473
                // policy so the sending node obtains the most up to date data.
3✔
2474
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
6✔
2475
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
3✔
2476
                }
3✔
2477
                failure := l.createFailureWithUpdate(false, originalScid, cb)
3✔
2478
                return NewLinkError(failure)
3✔
2479
        }
2480

2481
        // Check whether the outgoing htlc satisfies the channel policy.
2482
        err := l.canSendHtlc(
3✔
2483
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
3✔
2484
                originalScid, customRecords,
3✔
2485
        )
3✔
2486
        if err != nil {
6✔
2487
                return err
3✔
2488
        }
3✔
2489

2490
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
2491
        // the following constraint: the incoming time-lock minus our time-lock
2492
        // delta should equal the outgoing time lock. Otherwise, whether the
2493
        // sender messed up, or an intermediate node tampered with the HTLC.
2494
        timeDelta := policy.TimeLockDelta
3✔
2495
        if incomingTimeout < outgoingTimeout+timeDelta {
3✔
2496
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
×
2497
                        "expected at least %v block delta, got %v block delta",
×
2498
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
×
2499

×
2500
                // Grab the latest routing policy so the sending node is up to
×
2501
                // date with our current policy.
×
2502
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
×
2503
                        return lnwire.NewIncorrectCltvExpiry(
×
2504
                                incomingTimeout, *upd,
×
2505
                        )
×
2506
                }
×
2507
                failure := l.createFailureWithUpdate(false, originalScid, cb)
×
2508
                return NewLinkError(failure)
×
2509
        }
2510

2511
        return nil
3✔
2512
}
2513

2514
// CheckHtlcTransit should return a nil error if the passed HTLC details
2515
// satisfy the current channel policy.  Otherwise, a LinkError with a
2516
// valid protocol failure message should be returned in order to signal
2517
// the violation. This call is intended to be used for locally initiated
2518
// payments for which there is no corresponding incoming htlc.
2519
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
2520
        amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
2521
        customRecords lnwire.CustomRecords) *LinkError {
3✔
2522

3✔
2523
        l.RLock()
3✔
2524
        policy := l.cfg.FwrdingPolicy
3✔
2525
        l.RUnlock()
3✔
2526

3✔
2527
        // We pass in hop.Source here as this is only used in the Switch when
3✔
2528
        // trying to send over a local link. This causes the fallback mechanism
3✔
2529
        // to occur.
3✔
2530
        return l.canSendHtlc(
3✔
2531
                policy, payHash, amt, timeout, heightNow, hop.Source,
3✔
2532
                customRecords,
3✔
2533
        )
3✔
2534
}
3✔
2535

2536
// canSendHtlc checks whether the given htlc parameters satisfy
2537
// the channel's amount and time lock constraints.
2538
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
2539
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
2540
        heightNow uint32, originalScid lnwire.ShortChannelID,
2541
        customRecords lnwire.CustomRecords) *LinkError {
3✔
2542

3✔
2543
        // As our first sanity check, we'll ensure that the passed HTLC isn't
3✔
2544
        // too small for the next hop. If so, then we'll cancel the HTLC
3✔
2545
        // directly.
3✔
2546
        if amt < policy.MinHTLCOut {
6✔
2547
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
3✔
2548
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
3✔
2549
                        amt)
3✔
2550

3✔
2551
                // As part of the returned error, we'll send our latest routing
3✔
2552
                // policy so the sending node obtains the most up to date data.
3✔
2553
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
6✔
2554
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
3✔
2555
                }
3✔
2556
                failure := l.createFailureWithUpdate(false, originalScid, cb)
3✔
2557
                return NewLinkError(failure)
3✔
2558
        }
2559

2560
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
2561
        // cancel the HTLC directly.
2562
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
6✔
2563
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
3✔
2564
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
3✔
2565

3✔
2566
                // As part of the returned error, we'll send our latest routing
3✔
2567
                // policy so the sending node obtains the most up-to-date data.
3✔
2568
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
6✔
2569
                        return lnwire.NewTemporaryChannelFailure(upd)
3✔
2570
                }
3✔
2571
                failure := l.createFailureWithUpdate(false, originalScid, cb)
3✔
2572
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
3✔
2573
        }
2574

2575
        // We want to avoid offering an HTLC which will expire in the near
2576
        // future, so we'll reject an HTLC if the outgoing expiration time is
2577
        // too close to the current height.
2578
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
3✔
2579
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
×
2580
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
×
2581
                        timeout, heightNow)
×
2582

×
2583
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
×
2584
                        return lnwire.NewExpiryTooSoon(*upd)
×
2585
                }
×
2586
                failure := l.createFailureWithUpdate(false, originalScid, cb)
×
2587
                return NewLinkError(failure)
×
2588
        }
2589

2590
        // Check absolute max delta.
2591
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
3✔
2592
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
×
2593
                        "the future: got %v, but maximum is %v", payHash[:],
×
2594
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
×
2595

×
2596
                return NewLinkError(&lnwire.FailExpiryTooFar{})
×
2597
        }
×
2598

2599
        // We now check the available bandwidth to see if this HTLC can be
2600
        // forwarded.
2601
        availableBandwidth := l.Bandwidth()
3✔
2602
        auxBandwidth, err := fn.MapOptionZ(
3✔
2603
                l.cfg.AuxTrafficShaper,
3✔
2604
                func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
3✔
2605
                        var htlcBlob fn.Option[tlv.Blob]
×
2606
                        blob, err := customRecords.Serialize()
×
2607
                        if err != nil {
×
2608
                                return fn.Err[OptionalBandwidth](
×
2609
                                        fmt.Errorf("unable to serialize "+
×
2610
                                                "custom records: %w", err))
×
2611
                        }
×
2612

2613
                        if len(blob) > 0 {
×
2614
                                htlcBlob = fn.Some(blob)
×
2615
                        }
×
2616

2617
                        return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
×
2618
                },
2619
        ).Unpack()
2620
        if err != nil {
3✔
2621
                l.log.Errorf("Unable to determine aux bandwidth: %v", err)
×
2622
                return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
×
2623
        }
×
2624

2625
        if auxBandwidth.IsHandled && auxBandwidth.Bandwidth.IsSome() {
3✔
2626
                auxBandwidth.Bandwidth.WhenSome(
×
2627
                        func(bandwidth lnwire.MilliSatoshi) {
×
2628
                                availableBandwidth = bandwidth
×
2629
                        },
×
2630
                )
2631
        }
2632

2633
        // Check to see if there is enough balance in this channel.
2634
        if amt > availableBandwidth {
6✔
2635
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
3✔
2636
                        "larger than %v", amt, availableBandwidth)
3✔
2637
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
6✔
2638
                        return lnwire.NewTemporaryChannelFailure(upd)
3✔
2639
                }
3✔
2640
                failure := l.createFailureWithUpdate(false, originalScid, cb)
3✔
2641
                return NewDetailedLinkError(
3✔
2642
                        failure, OutgoingFailureInsufficientBalance,
3✔
2643
                )
3✔
2644
        }
2645

2646
        return nil
3✔
2647
}
2648

2649
// AuxBandwidth returns the bandwidth that can be used for a channel, expressed
2650
// in milli-satoshi. This might be different from the regular BTC bandwidth for
2651
// custom channels. This will always return fn.None() for a regular (non-custom)
2652
// channel.
2653
func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
2654
        cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
2655
        ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
×
2656

×
2657
        fundingBlob := l.FundingCustomBlob()
×
2658
        shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob, htlcBlob)
×
2659
        if err != nil {
×
2660
                return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
×
2661
                        "failed to decide whether to handle traffic: %w", err))
×
2662
        }
×
2663

2664
        log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
×
2665
                "traffic: %v", cid, shouldHandle)
×
2666

×
2667
        // If this channel isn't handled by the aux traffic shaper, we'll return
×
2668
        // early.
×
2669
        if !shouldHandle {
×
2670
                return fn.Ok(OptionalBandwidth{
×
2671
                        IsHandled: false,
×
2672
                })
×
2673
        }
×
2674

2675
        // Ask for a specific bandwidth to be used for the channel.
2676
        commitmentBlob := l.CommitmentCustomBlob()
×
2677
        auxBandwidth, err := ts.PaymentBandwidth(
×
2678
                fundingBlob, htlcBlob, commitmentBlob, l.Bandwidth(), amount,
×
2679
                l.channel.FetchLatestAuxHTLCView(),
×
2680
        )
×
2681
        if err != nil {
×
2682
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
×
2683
                        "bandwidth from external traffic shaper: %w", err))
×
2684
        }
×
2685

2686
        log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
×
2687
                "bandwidth: %v", cid, auxBandwidth)
×
2688

×
2689
        return fn.Ok(OptionalBandwidth{
×
2690
                IsHandled: true,
×
2691
                Bandwidth: fn.Some(auxBandwidth),
×
2692
        })
×
2693
}
2694

2695
// Stats returns the statistics of channel link.
2696
//
2697
// NOTE: Part of the ChannelLink interface.
2698
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
3✔
2699
        snapshot := l.channel.StateSnapshot()
3✔
2700

3✔
2701
        return snapshot.ChannelCommitment.CommitHeight,
3✔
2702
                snapshot.TotalMSatSent,
3✔
2703
                snapshot.TotalMSatReceived
3✔
2704
}
3✔
2705

2706
// String returns the string representation of channel link.
2707
//
2708
// NOTE: Part of the ChannelLink interface.
2709
func (l *channelLink) String() string {
×
2710
        return l.channel.ChannelPoint().String()
×
2711
}
×
2712

2713
// handleSwitchPacket handles the switch packets. This packets which might be
2714
// forwarded to us from another channel link in case the htlc update came from
2715
// another peer or if the update was created by user
2716
//
2717
// NOTE: Part of the packetHandler interface.
2718
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
3✔
2719
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
3✔
2720
                pkt.inKey(), pkt.outKey())
3✔
2721

3✔
2722
        return l.mailBox.AddPacket(pkt)
3✔
2723
}
3✔
2724

2725
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
2726
// to us from remote peer we have a channel with.
2727
//
2728
// NOTE: Part of the ChannelLink interface.
2729
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
3✔
2730
        select {
3✔
2731
        case <-l.cg.Done():
×
2732
                // Return early if the link is already in the process of
×
2733
                // quitting. It doesn't make sense to hand the message to the
×
2734
                // mailbox here.
×
2735
                return
×
2736
        default:
3✔
2737
        }
2738

2739
        err := l.mailBox.AddMessage(message)
3✔
2740
        if err != nil {
3✔
2741
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
2742
        }
×
2743
}
2744

2745
// updateChannelFee updates the commitment fee-per-kw on this channel by
2746
// committing to an update_fee message.
2747
func (l *channelLink) updateChannelFee(ctx context.Context,
2748
        feePerKw chainfee.SatPerKWeight) error {
×
2749

×
2750
        l.log.Infof("updating commit fee to %v", feePerKw)
×
2751

×
2752
        // We skip sending the UpdateFee message if the channel is not
×
2753
        // currently eligible to forward messages.
×
2754
        if !l.eligibleToUpdate() {
×
2755
                l.log.Debugf("skipping fee update for inactive channel")
×
2756
                return nil
×
2757
        }
×
2758

2759
        // Check and see if our proposed fee-rate would make us exceed the fee
2760
        // threshold.
2761
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
×
2762
        if err != nil {
×
2763
                // This shouldn't typically happen. If it does, it indicates
×
2764
                // something is wrong with our channel state.
×
2765
                return err
×
2766
        }
×
2767

2768
        if thresholdExceeded {
×
2769
                return fmt.Errorf("link fee threshold exceeded")
×
2770
        }
×
2771

2772
        // First, we'll update the local fee on our commitment.
2773
        if err := l.channel.UpdateFee(feePerKw); err != nil {
×
2774
                return err
×
2775
        }
×
2776

2777
        // The fee passed the channel's validation checks, so we update the
2778
        // mailbox feerate.
2779
        l.mailBox.SetFeeRate(feePerKw)
×
2780

×
2781
        // We'll then attempt to send a new UpdateFee message, and also lock it
×
2782
        // in immediately by triggering a commitment update.
×
2783
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
×
2784
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
×
2785
                return err
×
2786
        }
×
2787

2788
        return l.updateCommitTx(ctx)
×
2789
}
2790

2791
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
2792
// after receiving a revocation from the remote party, and reprocesses them in
2793
// the context of the provided forwarding package. Any settles or fails that
2794
// have already been acknowledged in the forwarding package will not be sent to
2795
// the switch.
2796
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
3✔
2797
        if len(fwdPkg.SettleFails) == 0 {
6✔
2798
                l.log.Trace("fwd package has no settle/fails to process " +
3✔
2799
                        "exiting early")
3✔
2800

3✔
2801
                return
3✔
2802
        }
3✔
2803

2804
        // Exit early if the fwdPkg is already processed.
2805
        if fwdPkg.State == channeldb.FwdStateCompleted {
3✔
2806
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
2807

×
2808
                return
×
2809
        }
×
2810

2811
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
3✔
2812

3✔
2813
        var switchPackets []*htlcPacket
3✔
2814
        for i, update := range fwdPkg.SettleFails {
6✔
2815
                destRef := fwdPkg.DestRef(uint16(i))
3✔
2816

3✔
2817
                // Skip any settles or fails that have already been
3✔
2818
                // acknowledged by the incoming link that originated the
3✔
2819
                // forwarded Add.
3✔
2820
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
3✔
2821
                        continue
×
2822
                }
2823

2824
                // TODO(roasbeef): rework log entries to a shared
2825
                // interface.
2826

2827
                switch msg := update.UpdateMsg.(type) {
3✔
2828
                // A settle for an HTLC we previously forwarded HTLC has been
2829
                // received. So we'll forward the HTLC to the switch which will
2830
                // handle propagating the settle to the prior hop.
2831
                case *lnwire.UpdateFulfillHTLC:
3✔
2832
                        // If hodl.SettleIncoming is requested, we will not
3✔
2833
                        // forward the SETTLE to the switch and will not signal
3✔
2834
                        // a free slot on the commitment transaction.
3✔
2835
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
3✔
2836
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
2837
                                continue
×
2838
                        }
2839

2840
                        settlePacket := &htlcPacket{
3✔
2841
                                outgoingChanID: l.ShortChanID(),
3✔
2842
                                outgoingHTLCID: msg.ID,
3✔
2843
                                destRef:        &destRef,
3✔
2844
                                htlc:           msg,
3✔
2845
                        }
3✔
2846

3✔
2847
                        // Add the packet to the batch to be forwarded, and
3✔
2848
                        // notify the overflow queue that a spare spot has been
3✔
2849
                        // freed up within the commitment state.
3✔
2850
                        switchPackets = append(switchPackets, settlePacket)
3✔
2851

2852
                // A failureCode message for a previously forwarded HTLC has
2853
                // been received. As a result a new slot will be freed up in
2854
                // our commitment state, so we'll forward this to the switch so
2855
                // the backwards undo can continue.
2856
                case *lnwire.UpdateFailHTLC:
3✔
2857
                        // If hodl.SettleIncoming is requested, we will not
3✔
2858
                        // forward the FAIL to the switch and will not signal a
3✔
2859
                        // free slot on the commitment transaction.
3✔
2860
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
3✔
2861
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
2862
                                continue
×
2863
                        }
2864

2865
                        // Fetch the reason the HTLC was canceled so we can
2866
                        // continue to propagate it. This failure originated
2867
                        // from another node, so the linkFailure field is not
2868
                        // set on the packet.
2869
                        failPacket := &htlcPacket{
3✔
2870
                                outgoingChanID: l.ShortChanID(),
3✔
2871
                                outgoingHTLCID: msg.ID,
3✔
2872
                                destRef:        &destRef,
3✔
2873
                                htlc:           msg,
3✔
2874
                        }
3✔
2875

3✔
2876
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
3✔
2877

3✔
2878
                        // If the failure message lacks an HMAC (but includes
3✔
2879
                        // the 4 bytes for encoding the message and padding
3✔
2880
                        // lengths, then this means that we received it as an
3✔
2881
                        // UpdateFailMalformedHTLC. As a result, we'll signal
3✔
2882
                        // that we need to convert this error within the switch
3✔
2883
                        // to an actual error, by encrypting it as if we were
3✔
2884
                        // the originating hop.
3✔
2885
                        convertedErrorSize := lnwire.FailureMessageLength + 4
3✔
2886
                        if len(msg.Reason) == convertedErrorSize {
6✔
2887
                                failPacket.convertedError = true
3✔
2888
                        }
3✔
2889

2890
                        // Add the packet to the batch to be forwarded, and
2891
                        // notify the overflow queue that a spare spot has been
2892
                        // freed up within the commitment state.
2893
                        switchPackets = append(switchPackets, failPacket)
3✔
2894
                }
2895
        }
2896

2897
        // Only spawn the task forward packets we have a non-zero number.
2898
        if len(switchPackets) > 0 {
6✔
2899
                go l.forwardBatch(false, switchPackets...)
3✔
2900
        }
3✔
2901
}
2902

2903
// processRemoteAdds serially processes each of the Add payment descriptors
2904
// which have been "locked-in" by receiving a revocation from the remote party.
2905
// The forwarding package provided instructs how to process this batch,
2906
// indicating whether this is the first time these Adds are being processed, or
2907
// whether we are reprocessing as a result of a failure or restart. Adds that
2908
// have already been acknowledged in the forwarding package will be ignored.
2909
//
2910
//nolint:funlen
2911
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
3✔
2912
        // Exit early if there are no adds to process.
3✔
2913
        if len(fwdPkg.Adds) == 0 {
6✔
2914
                l.log.Trace("fwd package has no adds to process exiting early")
3✔
2915

3✔
2916
                return
3✔
2917
        }
3✔
2918

2919
        // Exit early if the fwdPkg is already processed.
2920
        if fwdPkg.State == channeldb.FwdStateCompleted {
3✔
2921
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
2922

×
2923
                return
×
2924
        }
×
2925

2926
        l.log.Tracef("processing %d remote adds for height %d",
3✔
2927
                len(fwdPkg.Adds), fwdPkg.Height)
3✔
2928

3✔
2929
        // decodeReqs is a list of requests sent to the onion decoder. We expect
3✔
2930
        // the same length of responses to be returned.
3✔
2931
        decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds))
3✔
2932

3✔
2933
        // unackedAdds is a list of ADDs that's waiting for the remote's
3✔
2934
        // settle/fail update.
3✔
2935
        unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds))
3✔
2936

3✔
2937
        for i, update := range fwdPkg.Adds {
6✔
2938
                // If this index is already found in the ack filter, the
3✔
2939
                // response to this forwarding decision has already been
3✔
2940
                // committed by one of our commitment txns. ADDs in this state
3✔
2941
                // are waiting for the rest of the fwding package to get acked
3✔
2942
                // before being garbage collected.
3✔
2943
                if fwdPkg.State == channeldb.FwdStateProcessed &&
3✔
2944
                        fwdPkg.AckFilter.Contains(uint16(i)) {
3✔
2945

×
2946
                        continue
×
2947
                }
2948

2949
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
6✔
2950
                        // Before adding the new htlc to the state machine,
3✔
2951
                        // parse the onion object in order to obtain the
3✔
2952
                        // routing information with DecodeHopIterator function
3✔
2953
                        // which process the Sphinx packet.
3✔
2954
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
3✔
2955

3✔
2956
                        req := hop.DecodeHopIteratorRequest{
3✔
2957
                                OnionReader:    onionReader,
3✔
2958
                                RHash:          msg.PaymentHash[:],
3✔
2959
                                IncomingCltv:   msg.Expiry,
3✔
2960
                                IncomingAmount: msg.Amount,
3✔
2961
                                BlindingPoint:  msg.BlindingPoint,
3✔
2962
                        }
3✔
2963

3✔
2964
                        decodeReqs = append(decodeReqs, req)
3✔
2965
                        unackedAdds = append(unackedAdds, msg)
3✔
2966
                }
3✔
2967
        }
2968

2969
        // If the fwdPkg has already been processed, it means we are
2970
        // reforwarding the packets again, which happens only on a restart.
2971
        reforward := fwdPkg.State == channeldb.FwdStateProcessed
3✔
2972

3✔
2973
        // Atomically decode the incoming htlcs, simultaneously checking for
3✔
2974
        // replay attempts. A particular index in the returned, spare list of
3✔
2975
        // channel iterators should only be used if the failure code at the
3✔
2976
        // same index is lnwire.FailCodeNone.
3✔
2977
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
3✔
2978
                fwdPkg.ID(), decodeReqs, reforward,
3✔
2979
        )
3✔
2980
        if sphinxErr != nil {
3✔
2981
                l.failf(LinkFailureError{code: ErrInternalError},
×
2982
                        "unable to decode hop iterators: %v", sphinxErr)
×
2983
                return
×
2984
        }
×
2985

2986
        var switchPackets []*htlcPacket
3✔
2987

3✔
2988
        for i, update := range unackedAdds {
6✔
2989
                idx := uint16(i)
3✔
2990
                sourceRef := fwdPkg.SourceRef(idx)
3✔
2991
                add := *update
3✔
2992

3✔
2993
                // An incoming HTLC add has been full-locked in. As a result we
3✔
2994
                // can now examine the forwarding details of the HTLC, and the
3✔
2995
                // HTLC itself to decide if: we should forward it, cancel it,
3✔
2996
                // or are able to settle it (and it adheres to our fee related
3✔
2997
                // constraints).
3✔
2998

3✔
2999
                // Before adding the new htlc to the state machine, parse the
3✔
3000
                // onion object in order to obtain the routing information with
3✔
3001
                // DecodeHopIterator function which process the Sphinx packet.
3✔
3002
                chanIterator, failureCode := decodeResps[i].Result()
3✔
3003
                if failureCode != lnwire.CodeNone {
6✔
3004
                        // If we're unable to process the onion blob then we
3✔
3005
                        // should send the malformed htlc error to payment
3✔
3006
                        // sender.
3✔
3007
                        l.sendMalformedHTLCError(
3✔
3008
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
3✔
3009
                        )
3✔
3010

3✔
3011
                        l.log.Errorf("unable to decode onion hop iterator "+
3✔
3012
                                "for htlc(id=%v, hash=%x): %v", add.ID,
3✔
3013
                                add.PaymentHash, failureCode)
3✔
3014

3✔
3015
                        continue
3✔
3016
                }
3017

3018
                heightNow := l.cfg.BestHeight()
3✔
3019

3✔
3020
                pld, routeRole, pldErr := chanIterator.HopPayload()
3✔
3021
                if pldErr != nil {
6✔
3022
                        // If we're unable to process the onion payload, or we
3✔
3023
                        // received invalid onion payload failure, then we
3✔
3024
                        // should send an error back to the caller so the HTLC
3✔
3025
                        // can be canceled.
3✔
3026
                        var failedType uint64
3✔
3027

3✔
3028
                        // We need to get the underlying error value, so we
3✔
3029
                        // can't use errors.As as suggested by the linter.
3✔
3030
                        //nolint:errorlint
3✔
3031
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
3✔
3032
                                failedType = uint64(e.Type)
×
3033
                        }
×
3034

3035
                        // If we couldn't parse the payload, make our best
3036
                        // effort at creating an error encrypter that knows
3037
                        // what blinding type we were, but if we couldn't
3038
                        // parse the payload we have no way of knowing whether
3039
                        // we were the introduction node or not.
3040
                        //
3041
                        //nolint:ll
3042
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
3✔
3043
                                l.cfg.ExtractErrorEncrypter,
3✔
3044
                                // We need our route role here because we
3✔
3045
                                // couldn't parse or validate the payload.
3✔
3046
                                routeRole == hop.RouteRoleIntroduction,
3✔
3047
                        )
3✔
3048
                        if failCode != lnwire.CodeNone {
3✔
3049
                                l.log.Errorf("could not extract error "+
×
3050
                                        "encrypter: %v", pldErr)
×
3051

×
3052
                                // We can't process this htlc, send back
×
3053
                                // malformed.
×
3054
                                l.sendMalformedHTLCError(
×
3055
                                        add.ID, failureCode, add.OnionBlob,
×
3056
                                        &sourceRef,
×
3057
                                )
×
3058

×
3059
                                continue
×
3060
                        }
3061

3062
                        // TODO: currently none of the test unit infrastructure
3063
                        // is setup to handle TLV payloads, so testing this
3064
                        // would require implementing a separate mock iterator
3065
                        // for TLV payloads that also supports injecting invalid
3066
                        // payloads. Deferring this non-trival effort till a
3067
                        // later date
3068
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
3✔
3069

3✔
3070
                        l.sendHTLCError(
3✔
3071
                                add, sourceRef, NewLinkError(failure),
3✔
3072
                                obfuscator, false,
3✔
3073
                        )
3✔
3074

3✔
3075
                        l.log.Errorf("unable to decode forwarding "+
3✔
3076
                                "instructions: %v", pldErr)
3✔
3077

3✔
3078
                        continue
3✔
3079
                }
3080

3081
                // Retrieve onion obfuscator from onion blob in order to
3082
                // produce initial obfuscation of the onion failureCode.
3083
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
3✔
3084
                        l.cfg.ExtractErrorEncrypter,
3✔
3085
                        routeRole == hop.RouteRoleIntroduction,
3✔
3086
                )
3✔
3087
                if failureCode != lnwire.CodeNone {
3✔
3088
                        // If we're unable to process the onion blob than we
×
3089
                        // should send the malformed htlc error to payment
×
3090
                        // sender.
×
3091
                        l.sendMalformedHTLCError(
×
3092
                                add.ID, failureCode, add.OnionBlob,
×
3093
                                &sourceRef,
×
3094
                        )
×
3095

×
3096
                        l.log.Errorf("unable to decode onion "+
×
3097
                                "obfuscator: %v", failureCode)
×
3098

×
3099
                        continue
×
3100
                }
3101

3102
                fwdInfo := pld.ForwardingInfo()
3✔
3103

3✔
3104
                // Check whether the payload we've just processed uses our
3✔
3105
                // node as the introduction point (gave us a blinding key in
3✔
3106
                // the payload itself) and fail it back if we don't support
3✔
3107
                // route blinding.
3✔
3108
                if fwdInfo.NextBlinding.IsSome() &&
3✔
3109
                        l.cfg.DisallowRouteBlinding {
6✔
3110

3✔
3111
                        failure := lnwire.NewInvalidBlinding(
3✔
3112
                                fn.Some(add.OnionBlob),
3✔
3113
                        )
3✔
3114

3✔
3115
                        l.sendHTLCError(
3✔
3116
                                add, sourceRef, NewLinkError(failure),
3✔
3117
                                obfuscator, false,
3✔
3118
                        )
3✔
3119

3✔
3120
                        l.log.Error("rejected htlc that uses use as an " +
3✔
3121
                                "introduction point when we do not support " +
3✔
3122
                                "route blinding")
3✔
3123

3✔
3124
                        continue
3✔
3125
                }
3126

3127
                switch fwdInfo.NextHop {
3✔
3128
                case hop.Exit:
3✔
3129
                        err := l.processExitHop(
3✔
3130
                                add, sourceRef, obfuscator, fwdInfo,
3✔
3131
                                heightNow, pld,
3✔
3132
                        )
3✔
3133
                        if err != nil {
3✔
3134
                                l.failf(LinkFailureError{
×
3135
                                        code: ErrInternalError,
×
3136
                                }, err.Error()) //nolint
×
3137

×
3138
                                return
×
3139
                        }
×
3140

3141
                // There are additional channels left within this route. So
3142
                // we'll simply do some forwarding package book-keeping.
3143
                default:
3✔
3144
                        // If hodl.AddIncoming is requested, we will not
3✔
3145
                        // validate the forwarded ADD, nor will we send the
3✔
3146
                        // packet to the htlc switch.
3✔
3147
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
3✔
3148
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3149
                                continue
×
3150
                        }
3151

3152
                        endorseValue := l.experimentalEndorsement(
3✔
3153
                                record.CustomSet(add.CustomRecords),
3✔
3154
                        )
3✔
3155
                        endorseType := uint64(
3✔
3156
                                lnwire.ExperimentalEndorsementType,
3✔
3157
                        )
3✔
3158

3✔
3159
                        switch fwdPkg.State {
3✔
3160
                        case channeldb.FwdStateProcessed:
3✔
3161
                                // This add was not forwarded on the previous
3✔
3162
                                // processing phase, run it through our
3✔
3163
                                // validation pipeline to reproduce an error.
3✔
3164
                                // This may trigger a different error due to
3✔
3165
                                // expiring timelocks, but we expect that an
3✔
3166
                                // error will be reproduced.
3✔
3167
                                if !fwdPkg.FwdFilter.Contains(idx) {
3✔
3168
                                        break
×
3169
                                }
3170

3171
                                // Otherwise, it was already processed, we can
3172
                                // can collect it and continue.
3173
                                outgoingAdd := &lnwire.UpdateAddHTLC{
3✔
3174
                                        Expiry:        fwdInfo.OutgoingCTLV,
3✔
3175
                                        Amount:        fwdInfo.AmountToForward,
3✔
3176
                                        PaymentHash:   add.PaymentHash,
3✔
3177
                                        BlindingPoint: fwdInfo.NextBlinding,
3✔
3178
                                }
3✔
3179

3✔
3180
                                endorseValue.WhenSome(func(e byte) {
6✔
3181
                                        custRecords := map[uint64][]byte{
3✔
3182
                                                endorseType: {e},
3✔
3183
                                        }
3✔
3184

3✔
3185
                                        outgoingAdd.CustomRecords = custRecords
3✔
3186
                                })
3✔
3187

3188
                                // Finally, we'll encode the onion packet for
3189
                                // the _next_ hop using the hop iterator
3190
                                // decoded for the current hop.
3191
                                buf := bytes.NewBuffer(
3✔
3192
                                        outgoingAdd.OnionBlob[0:0],
3✔
3193
                                )
3✔
3194

3✔
3195
                                // We know this cannot fail, as this ADD
3✔
3196
                                // was marked forwarded in a previous
3✔
3197
                                // round of processing.
3✔
3198
                                chanIterator.EncodeNextHop(buf)
3✔
3199

3✔
3200
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
3✔
3201

3✔
3202
                                //nolint:ll
3✔
3203
                                updatePacket := &htlcPacket{
3✔
3204
                                        incomingChanID:       l.ShortChanID(),
3✔
3205
                                        incomingHTLCID:       add.ID,
3✔
3206
                                        outgoingChanID:       fwdInfo.NextHop,
3✔
3207
                                        sourceRef:            &sourceRef,
3✔
3208
                                        incomingAmount:       add.Amount,
3✔
3209
                                        amount:               outgoingAdd.Amount,
3✔
3210
                                        htlc:                 outgoingAdd,
3✔
3211
                                        obfuscator:           obfuscator,
3✔
3212
                                        incomingTimeout:      add.Expiry,
3✔
3213
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
3✔
3214
                                        inOnionCustomRecords: pld.CustomRecords(),
3✔
3215
                                        inboundFee:           inboundFee,
3✔
3216
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
3✔
3217
                                }
3✔
3218
                                switchPackets = append(
3✔
3219
                                        switchPackets, updatePacket,
3✔
3220
                                )
3✔
3221

3✔
3222
                                continue
3✔
3223
                        }
3224

3225
                        // TODO(roasbeef): ensure don't accept outrageous
3226
                        // timeout for htlc
3227

3228
                        // With all our forwarding constraints met, we'll
3229
                        // create the outgoing HTLC using the parameters as
3230
                        // specified in the forwarding info.
3231
                        addMsg := &lnwire.UpdateAddHTLC{
3✔
3232
                                Expiry:        fwdInfo.OutgoingCTLV,
3✔
3233
                                Amount:        fwdInfo.AmountToForward,
3✔
3234
                                PaymentHash:   add.PaymentHash,
3✔
3235
                                BlindingPoint: fwdInfo.NextBlinding,
3✔
3236
                        }
3✔
3237

3✔
3238
                        endorseValue.WhenSome(func(e byte) {
6✔
3239
                                addMsg.CustomRecords = map[uint64][]byte{
3✔
3240
                                        endorseType: {e},
3✔
3241
                                }
3✔
3242
                        })
3✔
3243

3244
                        // Finally, we'll encode the onion packet for the
3245
                        // _next_ hop using the hop iterator decoded for the
3246
                        // current hop.
3247
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
3✔
3248
                        err := chanIterator.EncodeNextHop(buf)
3✔
3249
                        if err != nil {
3✔
3250
                                l.log.Errorf("unable to encode the "+
×
3251
                                        "remaining route %v", err)
×
3252

×
3253
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
×
3254
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
3255
                                }
×
3256

3257
                                failure := l.createFailureWithUpdate(
×
3258
                                        true, hop.Source, cb,
×
3259
                                )
×
3260

×
3261
                                l.sendHTLCError(
×
3262
                                        add, sourceRef, NewLinkError(failure),
×
3263
                                        obfuscator, false,
×
3264
                                )
×
3265
                                continue
×
3266
                        }
3267

3268
                        // Now that this add has been reprocessed, only append
3269
                        // it to our list of packets to forward to the switch
3270
                        // this is the first time processing the add. If the
3271
                        // fwd pkg has already been processed, then we entered
3272
                        // the above section to recreate a previous error.  If
3273
                        // the packet had previously been forwarded, it would
3274
                        // have been added to switchPackets at the top of this
3275
                        // section.
3276
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
6✔
3277
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
3✔
3278

3✔
3279
                                //nolint:ll
3✔
3280
                                updatePacket := &htlcPacket{
3✔
3281
                                        incomingChanID:       l.ShortChanID(),
3✔
3282
                                        incomingHTLCID:       add.ID,
3✔
3283
                                        outgoingChanID:       fwdInfo.NextHop,
3✔
3284
                                        sourceRef:            &sourceRef,
3✔
3285
                                        incomingAmount:       add.Amount,
3✔
3286
                                        amount:               addMsg.Amount,
3✔
3287
                                        htlc:                 addMsg,
3✔
3288
                                        obfuscator:           obfuscator,
3✔
3289
                                        incomingTimeout:      add.Expiry,
3✔
3290
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
3✔
3291
                                        inOnionCustomRecords: pld.CustomRecords(),
3✔
3292
                                        inboundFee:           inboundFee,
3✔
3293
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
3✔
3294
                                }
3✔
3295

3✔
3296
                                fwdPkg.FwdFilter.Set(idx)
3✔
3297
                                switchPackets = append(switchPackets,
3✔
3298
                                        updatePacket)
3✔
3299
                        }
3✔
3300
                }
3301
        }
3302

3303
        // Commit the htlcs we are intending to forward if this package has not
3304
        // been fully processed.
3305
        if fwdPkg.State == channeldb.FwdStateLockedIn {
6✔
3306
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
3✔
3307
                if err != nil {
3✔
3308
                        l.failf(LinkFailureError{code: ErrInternalError},
×
3309
                                "unable to set fwd filter: %v", err)
×
3310
                        return
×
3311
                }
×
3312
        }
3313

3314
        if len(switchPackets) == 0 {
6✔
3315
                return
3✔
3316
        }
3✔
3317

3318
        l.log.Debugf("forwarding %d packets to switch: reforward=%v",
3✔
3319
                len(switchPackets), reforward)
3✔
3320

3✔
3321
        // NOTE: This call is made synchronous so that we ensure all circuits
3✔
3322
        // are committed in the exact order that they are processed in the link.
3✔
3323
        // Failing to do this could cause reorderings/gaps in the range of
3✔
3324
        // opened circuits, which violates assumptions made by the circuit
3✔
3325
        // trimming.
3✔
3326
        l.forwardBatch(reforward, switchPackets...)
3✔
3327
}
3328

3329
// experimentalEndorsement returns the value to set for our outgoing
3330
// experimental endorsement field, and a boolean indicating whether it should
3331
// be populated on the outgoing htlc.
3332
func (l *channelLink) experimentalEndorsement(
3333
        customUpdateAdd record.CustomSet) fn.Option[byte] {
3✔
3334

3✔
3335
        // Only relay experimental signal if we are within the experiment
3✔
3336
        // period.
3✔
3337
        if !l.cfg.ShouldFwdExpEndorsement() {
6✔
3338
                return fn.None[byte]()
3✔
3339
        }
3✔
3340

3341
        // If we don't have any custom records or the experimental field is
3342
        // not set, just forward a zero value.
3343
        if len(customUpdateAdd) == 0 {
6✔
3344
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
3✔
3345
        }
3✔
3346

3347
        t := uint64(lnwire.ExperimentalEndorsementType)
3✔
3348
        value, set := customUpdateAdd[t]
3✔
3349
        if !set {
3✔
3350
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
3351
        }
×
3352

3353
        // We expect at least one byte for this field, consider it invalid if
3354
        // it has no data and just forward a zero value.
3355
        if len(value) == 0 {
3✔
3356
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
3357
        }
×
3358

3359
        // Only forward endorsed if the incoming link is endorsed.
3360
        if value[0] == lnwire.ExperimentalEndorsed {
6✔
3361
                return fn.Some[byte](lnwire.ExperimentalEndorsed)
3✔
3362
        }
3✔
3363

3364
        // Forward as unendorsed otherwise, including cases where we've
3365
        // received an invalid value that uses more than 3 bits of information.
3366
        return fn.Some[byte](lnwire.ExperimentalUnendorsed)
3✔
3367
}
3368

3369
// processExitHop handles an htlc for which this link is the exit hop. It
3370
// returns a boolean indicating whether the commitment tx needs an update.
3371
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
3372
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
3373
        fwdInfo hop.ForwardingInfo, heightNow uint32,
3374
        payload invoices.Payload) error {
3✔
3375

3✔
3376
        // If hodl.ExitSettle is requested, we will not validate the final hop's
3✔
3377
        // ADD, nor will we settle the corresponding invoice or respond with the
3✔
3378
        // preimage.
3✔
3379
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
6✔
3380
                l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
3✔
3381
                        hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
3✔
3382

3✔
3383
                return nil
3✔
3384
        }
3✔
3385

3386
        // In case the traffic shaper is active, we'll check if the HTLC has
3387
        // custom records and skip the amount check in the onion payload below.
3388
        isCustomHTLC := fn.MapOptionZ(
3✔
3389
                l.cfg.AuxTrafficShaper,
3✔
3390
                func(ts AuxTrafficShaper) bool {
3✔
3391
                        return ts.IsCustomHTLC(add.CustomRecords)
×
3392
                },
×
3393
        )
3394

3395
        // As we're the exit hop, we'll double check the hop-payload included in
3396
        // the HTLC to ensure that it was crafted correctly by the sender and
3397
        // is compatible with the HTLC we were extended. If an external
3398
        // validator is active we might bypass the amount check.
3399
        if !isCustomHTLC && add.Amount < fwdInfo.AmountToForward {
3✔
3400
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
×
3401
                        "incompatible value: expected <=%v, got %v",
×
3402
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
×
3403

×
3404
                failure := NewLinkError(
×
3405
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
×
3406
                )
×
3407
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
×
3408

×
3409
                return nil
×
3410
        }
×
3411

3412
        // We'll also ensure that our time-lock value has been computed
3413
        // correctly.
3414
        if add.Expiry < fwdInfo.OutgoingCTLV {
3✔
3415
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
×
3416
                        "incompatible time-lock: expected <=%v, got %v",
×
3417
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
×
3418

×
3419
                failure := NewLinkError(
×
3420
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
×
3421
                )
×
3422

×
3423
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
×
3424

×
3425
                return nil
×
3426
        }
×
3427

3428
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
3429
        // after this, this code will be re-executed after restart. We will
3430
        // receive back a resolution event.
3431
        invoiceHash := lntypes.Hash(add.PaymentHash)
3✔
3432

3✔
3433
        circuitKey := models.CircuitKey{
3✔
3434
                ChanID: l.ShortChanID(),
3✔
3435
                HtlcID: add.ID,
3✔
3436
        }
3✔
3437

3✔
3438
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
3✔
3439
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
3✔
3440
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
3✔
3441
        )
3✔
3442
        if err != nil {
3✔
3443
                return err
×
3444
        }
×
3445

3446
        // Create a hodlHtlc struct and decide either resolved now or later.
3447
        htlc := hodlHtlc{
3✔
3448
                add:        add,
3✔
3449
                sourceRef:  sourceRef,
3✔
3450
                obfuscator: obfuscator,
3✔
3451
        }
3✔
3452

3✔
3453
        // If the event is nil, the invoice is being held, so we save payment
3✔
3454
        // descriptor for future reference.
3✔
3455
        if event == nil {
6✔
3456
                l.hodlMap[circuitKey] = htlc
3✔
3457
                return nil
3✔
3458
        }
3✔
3459

3460
        // Process the received resolution.
3461
        return l.processHtlcResolution(event, htlc)
3✔
3462
}
3463

3464
// settleHTLC settles the HTLC on the channel.
3465
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
3466
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
3✔
3467

3✔
3468
        hash := preimage.Hash()
3✔
3469

3✔
3470
        l.log.Infof("settling htlc %v as exit hop", hash)
3✔
3471

3✔
3472
        err := l.channel.SettleHTLC(
3✔
3473
                preimage, htlcIndex, &sourceRef, nil, nil,
3✔
3474
        )
3✔
3475
        if err != nil {
3✔
3476
                return fmt.Errorf("unable to settle htlc: %w", err)
×
3477
        }
×
3478

3479
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
3480
        // fake one before sending it to the peer.
3481
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
6✔
3482
                l.log.Warnf(hodl.BogusSettle.Warning())
3✔
3483
                preimage = [32]byte{}
3✔
3484
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
3✔
3485
        }
3✔
3486

3487
        // HTLC was successfully settled locally send notification about it
3488
        // remote peer.
3489
        err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
3✔
3490
                ChanID:          l.ChanID(),
3✔
3491
                ID:              htlcIndex,
3✔
3492
                PaymentPreimage: preimage,
3✔
3493
        })
3✔
3494
        if err != nil {
3✔
NEW
3495
                l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
×
NEW
3496
        }
×
3497

3498
        // Once we have successfully settled the htlc, notify a settle event.
3499
        l.cfg.HtlcNotifier.NotifySettleEvent(
3✔
3500
                HtlcKey{
3✔
3501
                        IncomingCircuit: models.CircuitKey{
3✔
3502
                                ChanID: l.ShortChanID(),
3✔
3503
                                HtlcID: htlcIndex,
3✔
3504
                        },
3✔
3505
                },
3✔
3506
                preimage,
3✔
3507
                HtlcEventTypeReceive,
3✔
3508
        )
3✔
3509

3✔
3510
        return nil
3✔
3511
}
3512

3513
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
3514
// err chan for the individual responses. This method is intended to be spawned
3515
// as a goroutine so the responses can be handled in the background.
3516
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
3✔
3517
        // Don't forward packets for which we already have a response in our
3✔
3518
        // mailbox. This could happen if a packet fails and is buffered in the
3✔
3519
        // mailbox, and the incoming link flaps.
3✔
3520
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
3✔
3521
        for _, pkt := range packets {
6✔
3522
                if l.mailBox.HasPacket(pkt.inKey()) {
6✔
3523
                        continue
3✔
3524
                }
3525

3526
                filteredPkts = append(filteredPkts, pkt)
3✔
3527
        }
3528

3529
        err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...)
3✔
3530
        if err != nil {
3✔
3531
                log.Errorf("Unhandled error while reforwarding htlc "+
×
3532
                        "settle/fail over htlcswitch: %v", err)
×
3533
        }
×
3534
}
3535

3536
// sendHTLCError functions cancels HTLC and send cancel message back to the
3537
// peer from which HTLC was received.
3538
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
3539
        sourceRef channeldb.AddRef, failure *LinkError,
3540
        e hop.ErrorEncrypter, isReceive bool) {
3✔
3541

3✔
3542
        reason, err := e.EncryptFirstHop(failure.WireMessage())
3✔
3543
        if err != nil {
3✔
3544
                l.log.Errorf("unable to obfuscate error: %v", err)
×
3545
                return
×
3546
        }
×
3547

3548
        err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
3✔
3549
        if err != nil {
3✔
3550
                l.log.Errorf("unable cancel htlc: %v", err)
×
3551
                return
×
3552
        }
×
3553

3554
        // Send the appropriate failure message depending on whether we're
3555
        // in a blinded route or not.
3556
        if err := l.sendIncomingHTLCFailureMsg(
3✔
3557
                add.ID, e, reason,
3✔
3558
        ); err != nil {
3✔
3559
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
3560
                return
×
3561
        }
×
3562

3563
        // Notify a link failure on our incoming link. Outgoing htlc information
3564
        // is not available at this point, because we have not decrypted the
3565
        // onion, so it is excluded.
3566
        var eventType HtlcEventType
3✔
3567
        if isReceive {
6✔
3568
                eventType = HtlcEventTypeReceive
3✔
3569
        } else {
6✔
3570
                eventType = HtlcEventTypeForward
3✔
3571
        }
3✔
3572

3573
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
3✔
3574
                HtlcKey{
3✔
3575
                        IncomingCircuit: models.CircuitKey{
3✔
3576
                                ChanID: l.ShortChanID(),
3✔
3577
                                HtlcID: add.ID,
3✔
3578
                        },
3✔
3579
                },
3✔
3580
                HtlcInfo{
3✔
3581
                        IncomingTimeLock: add.Expiry,
3✔
3582
                        IncomingAmt:      add.Amount,
3✔
3583
                },
3✔
3584
                eventType,
3✔
3585
                failure,
3✔
3586
                true,
3✔
3587
        )
3✔
3588
}
3589

3590
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
3591
// peer from which the HTLC was received. This function is primarily used to
3592
// handle the special requirements of route blinding, specifically:
3593
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
3594
// - Introduction nodes should return regular HTLC failure messages.
3595
//
3596
// It accepts the original opaque failure, which will be used in the case
3597
// that we're not part of a blinded route and an error encrypter that'll be
3598
// used if we are the introduction node and need to present an error as if
3599
// we're the failing party.
3600
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
3601
        e hop.ErrorEncrypter,
3602
        originalFailure lnwire.OpaqueReason) error {
3✔
3603

3✔
3604
        var msg lnwire.Message
3✔
3605
        switch {
3✔
3606
        // Our circuit's error encrypter will be nil if this was a locally
3607
        // initiated payment. We can only hit a blinded error for a locally
3608
        // initiated payment if we allow ourselves to be picked as the
3609
        // introduction node for our own payments and in that case we
3610
        // shouldn't reach this code. To prevent the HTLC getting stuck,
3611
        // we fail it back and log an error.
3612
        // code.
3613
        case e == nil:
×
3614
                msg = &lnwire.UpdateFailHTLC{
×
3615
                        ChanID: l.ChanID(),
×
3616
                        ID:     htlcIndex,
×
3617
                        Reason: originalFailure,
×
3618
                }
×
3619

×
3620
                l.log.Errorf("Unexpected blinded failure when "+
×
3621
                        "we are the sending node, incoming htlc: %v(%v)",
×
3622
                        l.ShortChanID(), htlcIndex)
×
3623

3624
        // For cleartext hops (ie, non-blinded/normal) we don't need any
3625
        // transformation on the error message and can just send the original.
3626
        case !e.Type().IsBlinded():
3✔
3627
                msg = &lnwire.UpdateFailHTLC{
3✔
3628
                        ChanID: l.ChanID(),
3✔
3629
                        ID:     htlcIndex,
3✔
3630
                        Reason: originalFailure,
3✔
3631
                }
3✔
3632

3633
        // When we're the introduction node, we need to convert the error to
3634
        // a UpdateFailHTLC.
3635
        case e.Type() == hop.EncrypterTypeIntroduction:
3✔
3636
                l.log.Debugf("Introduction blinded node switching out failure "+
3✔
3637
                        "error: %v", htlcIndex)
3✔
3638

3✔
3639
                // The specification does not require that we set the onion
3✔
3640
                // blob.
3✔
3641
                failureMsg := lnwire.NewInvalidBlinding(
3✔
3642
                        fn.None[[lnwire.OnionPacketSize]byte](),
3✔
3643
                )
3✔
3644
                reason, err := e.EncryptFirstHop(failureMsg)
3✔
3645
                if err != nil {
3✔
3646
                        return err
×
3647
                }
×
3648

3649
                msg = &lnwire.UpdateFailHTLC{
3✔
3650
                        ChanID: l.ChanID(),
3✔
3651
                        ID:     htlcIndex,
3✔
3652
                        Reason: reason,
3✔
3653
                }
3✔
3654

3655
        // If we are a relaying node, we need to switch out any error that
3656
        // we've received to a malformed HTLC error.
3657
        case e.Type() == hop.EncrypterTypeRelaying:
3✔
3658
                l.log.Debugf("Relaying blinded node switching out malformed "+
3✔
3659
                        "error: %v", htlcIndex)
3✔
3660

3✔
3661
                msg = &lnwire.UpdateFailMalformedHTLC{
3✔
3662
                        ChanID:      l.ChanID(),
3✔
3663
                        ID:          htlcIndex,
3✔
3664
                        FailureCode: lnwire.CodeInvalidBlinding,
3✔
3665
                }
3✔
3666

3667
        default:
×
3668
                return fmt.Errorf("unexpected encrypter: %d", e)
×
3669
        }
3670

3671
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
3672
                l.log.Warnf("Send update fail failed: %v", err)
×
3673
        }
×
3674

3675
        return nil
3✔
3676
}
3677

3678
// sendMalformedHTLCError helper function which sends the malformed HTLC update
3679
// to the payment sender.
3680
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
3681
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
3682
        sourceRef *channeldb.AddRef) {
3✔
3683

3✔
3684
        shaOnionBlob := sha256.Sum256(onionBlob[:])
3✔
3685
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
3✔
3686
        if err != nil {
3✔
3687
                l.log.Errorf("unable cancel htlc: %v", err)
×
3688
                return
×
3689
        }
×
3690

3691
        err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
3✔
3692
                ChanID:       l.ChanID(),
3✔
3693
                ID:           htlcIndex,
3✔
3694
                ShaOnionBlob: shaOnionBlob,
3✔
3695
                FailureCode:  code,
3✔
3696
        })
3✔
3697
        if err != nil {
3✔
NEW
3698
                l.log.Errorf("failed to send UpdateFailMalformedHTLC: %v", err)
×
NEW
3699
        }
×
3700
}
3701

3702
// failf is a function which is used to encapsulate the action necessary for
3703
// properly failing the link. It takes a LinkFailureError, which will be passed
3704
// to the OnChannelFailure closure, in order for it to determine if we should
3705
// force close the channel, and if we should send an error message to the
3706
// remote peer.
3707
func (l *channelLink) failf(linkErr LinkFailureError, format string,
3708
        a ...interface{}) {
3✔
3709

3✔
3710
        reason := fmt.Errorf(format, a...)
3✔
3711

3✔
3712
        // Return if we have already notified about a failure.
3✔
3713
        if l.failed {
6✔
3714
                l.log.Warnf("ignoring link failure (%v), as link already "+
3✔
3715
                        "failed", reason)
3✔
3716
                return
3✔
3717
        }
3✔
3718

3719
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
3✔
3720

3✔
3721
        // Set failed, such that we won't process any more updates, and notify
3✔
3722
        // the peer about the failure.
3✔
3723
        l.failed = true
3✔
3724
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
3✔
3725
}
3726

3727
// FundingCustomBlob returns the custom funding blob of the channel that this
3728
// link is associated with. The funding blob represents static information about
3729
// the channel that was created at channel funding time.
3730
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
3731
        if l.channel == nil {
×
3732
                return fn.None[tlv.Blob]()
×
3733
        }
×
3734

3735
        if l.channel.State() == nil {
×
3736
                return fn.None[tlv.Blob]()
×
3737
        }
×
3738

3739
        return l.channel.State().CustomBlob
×
3740
}
3741

3742
// CommitmentCustomBlob returns the custom blob of the current local commitment
3743
// of the channel that this link is associated with.
3744
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
3745
        if l.channel == nil {
×
3746
                return fn.None[tlv.Blob]()
×
3747
        }
×
3748

3749
        return l.channel.LocalCommitmentBlob()
×
3750
}
3751

3752
// handleHtlcResolution takes an HTLC resolution and processes it by draining
3753
// the hodlQueue. Once processed, a commit_sig is sent to the remote to update
3754
// their commitment.
3755
func (l *channelLink) handleHtlcResolution(ctx context.Context,
3756
        hodlItem any) error {
3✔
3757

3✔
3758
        htlcResolution, ok := hodlItem.(invoices.HtlcResolution)
3✔
3759
        if !ok {
3✔
NEW
3760
                return fmt.Errorf("expect HtlcResolution, got %T", hodlItem)
×
NEW
3761
        }
×
3762

3763
        err := l.processHodlQueue(ctx, htlcResolution)
3✔
3764
        // No error, success.
3✔
3765
        if err == nil {
6✔
3766
                return nil
3✔
3767
        }
3✔
3768

NEW
3769
        switch {
×
3770
        // If the duplicate keystone error was encountered, fail back
3771
        // gracefully.
NEW
3772
        case errors.Is(err, ErrDuplicateKeystone):
×
NEW
3773
                l.failf(
×
NEW
3774
                        LinkFailureError{
×
NEW
3775
                                code: ErrCircuitError,
×
NEW
3776
                        },
×
NEW
3777
                        "process hodl queue: temporary circuit error: %v", err,
×
NEW
3778
                )
×
3779

3780
        // Send an Error message to the peer.
NEW
3781
        default:
×
NEW
3782
                l.failf(
×
NEW
3783
                        LinkFailureError{
×
NEW
3784
                                code: ErrInternalError,
×
NEW
3785
                        },
×
NEW
3786
                        "process hodl queue: unable to update commitment: %v",
×
NEW
3787
                        err,
×
NEW
3788
                )
×
3789
        }
3790

NEW
3791
        return err
×
3792
}
3793

3794
// handleQuiescenceReq takes a locally initialized (RPC) quiescence request and
3795
// forwards it to the quiescer for further processing.
3796
func (l *channelLink) handleQuiescenceReq(req StfuReq) error {
3✔
3797
        l.quiescer.InitStfu(req)
3✔
3798

3✔
3799
        if !l.noDanglingUpdates(lntypes.Local) {
3✔
NEW
3800
                return nil
×
NEW
3801
        }
×
3802

3803
        err := l.quiescer.SendOwedStfu()
3✔
3804
        if err != nil {
3✔
NEW
3805
                l.stfuFailf("SendOwedStfu: %s", err.Error())
×
NEW
3806
                res := fn.Err[lntypes.ChannelParty](err)
×
NEW
3807
                req.Resolve(res)
×
NEW
3808
        }
×
3809

3810
        return err
3✔
3811
}
3812

3813
// handleUpdateFee is called whenever the `updateFeeTimer` ticks. It is used to
3814
// decide whether we should send an `update_fee` msg to update the commitment's
3815
// feerate.
NEW
3816
func (l *channelLink) handleUpdateFee(ctx context.Context) error {
×
NEW
3817
        // If we're not the initiator of the channel, we don't control the fees,
×
NEW
3818
        // so we can ignore this.
×
NEW
3819
        if !l.channel.IsInitiator() {
×
NEW
3820
                return nil
×
NEW
3821
        }
×
3822

3823
        // If we are the initiator, then we'll sample the current fee rate to
3824
        // get into the chain within 3 blocks.
NEW
3825
        netFee, err := l.sampleNetworkFee()
×
NEW
3826
        if err != nil {
×
NEW
3827
                return fmt.Errorf("unable to sample network fee: %w", err)
×
NEW
3828
        }
×
3829

NEW
3830
        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
×
NEW
3831

×
NEW
3832
        newCommitFee := l.channel.IdealCommitFeeRate(
×
NEW
3833
                netFee, minRelayFee,
×
NEW
3834
                l.cfg.MaxAnchorsCommitFeeRate,
×
NEW
3835
                l.cfg.MaxFeeAllocation,
×
NEW
3836
        )
×
NEW
3837

×
NEW
3838
        // We determine if we should adjust the commitment fee based on the
×
NEW
3839
        // current commitment fee, the suggested new commitment fee and the
×
NEW
3840
        // current minimum relay fee rate.
×
NEW
3841
        commitFee := l.channel.CommitFeeRate()
×
NEW
3842
        if !shouldAdjustCommitFee(newCommitFee, commitFee, minRelayFee) {
×
NEW
3843
                return nil
×
NEW
3844
        }
×
3845

3846
        // If we do, then we'll send a new UpdateFee message to the remote
3847
        // party, to be locked in with a new update.
NEW
3848
        err = l.updateChannelFee(ctx, newCommitFee)
×
NEW
3849
        if err != nil {
×
NEW
3850
                return fmt.Errorf("unable to update fee rate: %w", err)
×
NEW
3851
        }
×
3852

NEW
3853
        return nil
×
3854
}
3855

3856
// toggleBatchTicker checks whether we need to resume or pause the batch ticker.
3857
// When we have no pending updates, the ticker is paused, otherwise resumed.
3858
func (l *channelLink) toggleBatchTicker() {
3✔
3859
        // If the previous event resulted in a non-empty batch, resume the batch
3✔
3860
        // ticker so that it can be cleared. Otherwise pause the ticker to
3✔
3861
        // prevent waking up the htlcManager while the batch is empty.
3✔
3862
        numUpdates := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
3✔
3863
        if numUpdates > 0 {
6✔
3864
                l.cfg.BatchTicker.Resume()
3✔
3865
                l.log.Tracef("BatchTicker resumed, NumPendingUpdates(Local, "+
3✔
3866
                        "Remote)=%d", numUpdates)
3✔
3867

3✔
3868
                return
3✔
3869
        }
3✔
3870

3871
        l.cfg.BatchTicker.Pause()
3✔
3872
        l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" +
3✔
3873
                "(Local, Remote)")
3✔
3874
}
3875

3876
// resumeLink is called when starting a previous link. It will go through the
3877
// reestablishment protocol and reforwarding packets that are yet resolved.
3878
func (l *channelLink) resumeLink(ctx context.Context) error {
3✔
3879
        // If this isn't the first time that this channel link has been created,
3✔
3880
        // then we'll need to check to see if we need to re-synchronize state
3✔
3881
        // with the remote peer. settledHtlcs is a map of HTLC's that we
3✔
3882
        // re-settled as part of the channel state sync.
3✔
3883
        if l.cfg.SyncStates {
6✔
3884
                err := l.syncChanStates(ctx)
3✔
3885
                if err != nil {
6✔
3886
                        l.handleChanSyncErr(err)
3✔
3887

3✔
3888
                        return err
3✔
3889
                }
3✔
3890
        }
3891

3892
        // If a shutdown message has previously been sent on this link, then we
3893
        // need to make sure that we have disabled any HTLC adds on the outgoing
3894
        // direction of the link and that we re-resend the same shutdown message
3895
        // that we previously sent.
3896
        //
3897
        // TODO(yy): we should either move this to chanCloser, or move all
3898
        // shutdown handling logic to be managed by the link, but not a mixed of
3899
        // partial management by two subsystems.
3900
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
6✔
3901
                // Immediately disallow any new outgoing HTLCs.
3✔
3902
                if !l.DisableAdds(Outgoing) {
3✔
NEW
3903
                        l.log.Warnf("Outgoing link adds already disabled")
×
NEW
3904
                }
×
3905

3906
                // Re-send the shutdown message the peer. Since syncChanStates
3907
                // would have sent any outstanding CommitSig, it is fine for us
3908
                // to immediately queue the shutdown message now.
3909
                err := l.cfg.Peer.SendMessage(false, &shutdown)
3✔
3910
                if err != nil {
3✔
NEW
3911
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
NEW
3912
                }
×
3913
        })
3914

3915
        // We've successfully reestablished the channel, mark it as such to
3916
        // allow the switch to forward HTLCs in the outbound direction.
3917
        l.markReestablished()
3✔
3918

3✔
3919
        // With the channel states synced, we now reset the mailbox to ensure we
3✔
3920
        // start processing all unacked packets in order. This is done here to
3✔
3921
        // ensure that all acknowledgments that occur during channel
3✔
3922
        // resynchronization have taken affect, causing us only to pull unacked
3✔
3923
        // packets after starting to read from the downstream mailbox.
3✔
3924
        err := l.mailBox.ResetPackets()
3✔
3925
        if err != nil {
3✔
NEW
3926
                l.log.Errorf("failed to reset packets: %v", err)
×
NEW
3927
        }
×
3928

3929
        // If the channel is pending, there's no need to reforwarding packets.
3930
        if l.ShortChanID() == hop.Source {
3✔
NEW
3931
                return nil
×
NEW
3932
        }
×
3933

3934
        // After cleaning up any memory pertaining to incoming packets, we now
3935
        // replay our forwarding packages to handle any htlcs that can be
3936
        // processed locally, or need to be forwarded out to the switch. We will
3937
        // only attempt to resolve packages if our short chan id indicates that
3938
        // the channel is not pending, otherwise we should have no htlcs to
3939
        // reforward.
3940
        err = l.resolveFwdPkgs(ctx)
3✔
3941

3✔
3942
        // No error was encountered, success.
3✔
3943
        if err == nil {
6✔
3944
                return nil
3✔
3945
        }
3✔
3946

NEW
3947
        switch {
×
3948
        // If the duplicate keystone error was encountered, we'll fail without
3949
        // sending an Error message to the peer.
NEW
3950
        case errors.Is(err, ErrDuplicateKeystone):
×
NEW
3951
                l.failf(LinkFailureError{code: ErrCircuitError},
×
NEW
3952
                        "temporary circuit error: %v", err)
×
3953

3954
        // A non-nil error was encountered, send an Error message to the peer.
NEW
3955
        default:
×
NEW
3956
                l.failf(LinkFailureError{code: ErrInternalError},
×
NEW
3957
                        "unable to resolve fwd pkgs: %v", err)
×
3958
        }
3959

3960
        // With our link's in-memory state fully reconstructed, spawn a
3961
        // goroutine to manage the reclamation of disk space occupied by
3962
        // completed forwarding packages.
NEW
3963
        l.cg.WgAdd(1)
×
NEW
3964
        go l.fwdPkgGarbager()
×
NEW
3965

×
NEW
3966
        return err
×
3967
}
3968

3969
// processRemoteUpdateAddHTLC takes an `UpdateAddHTLC` msg sent from the remote
3970
// and processes it.
3971
func (l *channelLink) processRemoteUpdateAddHTLC(
3972
        msg *lnwire.UpdateAddHTLC) error {
3✔
3973

3✔
3974
        if l.IsFlushing(Incoming) {
3✔
NEW
3975
                // This is forbidden by the protocol specification. The best
×
NEW
3976
                // chance we have to deal with this is to drop the connection.
×
NEW
3977
                // This should roll back the channel state to the last
×
NEW
3978
                // CommitSig. If the remote has already sent a CommitSig we
×
NEW
3979
                // haven't received yet, channel state will be re-synchronized
×
NEW
3980
                // with a ChannelReestablish message upon reconnection and the
×
NEW
3981
                // protocol state that caused us to flush the link will be
×
NEW
3982
                // rolled back. In the event that there was some
×
NEW
3983
                // non-deterministic behavior in the remote that caused them to
×
NEW
3984
                // violate the protocol, we have a decent shot at correcting it
×
NEW
3985
                // this way, since reconnecting will put us in the cleanest
×
NEW
3986
                // possible state to try again.
×
NEW
3987
                //
×
NEW
3988
                // In addition to the above, it is possible for us to hit this
×
NEW
3989
                // case in situations where we improperly handle message
×
NEW
3990
                // ordering due to concurrency choices. An issue has been filed
×
NEW
3991
                // to address this here:
×
NEW
3992
                // https://github.com/lightningnetwork/lnd/issues/8393
×
NEW
3993
                err := errors.New("received add while link is flushing")
×
NEW
3994
                l.failf(
×
NEW
3995
                        LinkFailureError{
×
NEW
3996
                                code:             ErrInvalidUpdate,
×
NEW
3997
                                FailureAction:    LinkFailureDisconnect,
×
NEW
3998
                                PermanentFailure: false,
×
NEW
3999
                                Warning:          true,
×
NEW
4000
                        }, err.Error(),
×
NEW
4001
                )
×
NEW
4002

×
NEW
4003
                return err
×
NEW
4004
        }
×
4005

4006
        // Disallow htlcs with blinding points set if we haven't enabled the
4007
        // feature. This saves us from having to process the onion at all, but
4008
        // will only catch blinded payments where we are a relaying node (as the
4009
        // blinding point will be in the payload when we're the introduction
4010
        // node).
4011
        if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
3✔
NEW
4012
                err := errors.New("blinding point included when route " +
×
NEW
4013
                        "blinding is disabled")
×
NEW
4014

×
NEW
4015
                l.failf(LinkFailureError{code: ErrInvalidUpdate}, err.Error())
×
NEW
4016

×
NEW
4017
                return err
×
NEW
4018
        }
×
4019

4020
        // We have to check the limit here rather than later in the switch
4021
        // because the counterparty can keep sending HTLC's without sending a
4022
        // revoke. This would mean that the switch check would only occur later.
4023
        if l.isOverexposedWithHtlc(msg, true) {
3✔
NEW
4024
                err := errors.New("peer sent us an HTLC that exceeded our " +
×
NEW
4025
                        "max fee exposure")
×
NEW
4026
                l.failf(LinkFailureError{code: ErrInternalError}, err.Error())
×
NEW
4027

×
NEW
4028
                return err
×
NEW
4029
        }
×
4030

4031
        // We just received an add request from an upstream peer, so we add it
4032
        // to our state machine, then add the HTLC to our "settle" list in the
4033
        // event that we know the preimage.
4034
        index, err := l.channel.ReceiveHTLC(msg)
3✔
4035
        if err != nil {
3✔
NEW
4036
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
NEW
4037
                        "unable to handle upstream add HTLC: %v", err)
×
NEW
4038

×
NEW
4039
                return err
×
NEW
4040
        }
×
4041

4042
        l.log.Tracef("receive upstream htlc with payment hash(%x), "+
3✔
4043
                "assigning index: %v", msg.PaymentHash[:], index)
3✔
4044

3✔
4045
        return nil
3✔
4046
}
4047

4048
// processRemoteUpdateFulfillHTLC takes an `UpdateFulfillHTLC` msg sent from the
4049
// remote and processes it.
4050
func (l *channelLink) processRemoteUpdateFulfillHTLC(
4051
        msg *lnwire.UpdateFulfillHTLC) error {
3✔
4052

3✔
4053
        pre := msg.PaymentPreimage
3✔
4054
        idx := msg.ID
3✔
4055

3✔
4056
        // Before we pipeline the settle, we'll check the set of active htlc's
3✔
4057
        // to see if the related UpdateAddHTLC has been fully locked-in.
3✔
4058
        var lockedin bool
3✔
4059
        htlcs := l.channel.ActiveHtlcs()
3✔
4060
        for _, add := range htlcs {
6✔
4061
                // The HTLC will be outgoing and match idx.
3✔
4062
                if !add.Incoming && add.HtlcIndex == idx {
6✔
4063
                        lockedin = true
3✔
4064
                        break
3✔
4065
                }
4066
        }
4067

4068
        if !lockedin {
3✔
NEW
4069
                err := errors.New("unable to handle upstream settle")
×
NEW
4070
                l.failf(LinkFailureError{code: ErrInvalidUpdate}, err.Error())
×
NEW
4071

×
NEW
4072
                return err
×
NEW
4073
        }
×
4074

4075
        if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
6✔
4076
                l.failf(
3✔
4077
                        LinkFailureError{
3✔
4078
                                code:          ErrInvalidUpdate,
3✔
4079
                                FailureAction: LinkFailureForceClose,
3✔
4080
                        },
3✔
4081
                        "unable to handle upstream settle HTLC: %v", err,
3✔
4082
                )
3✔
4083

3✔
4084
                return err
3✔
4085
        }
3✔
4086

4087
        settlePacket := &htlcPacket{
3✔
4088
                outgoingChanID: l.ShortChanID(),
3✔
4089
                outgoingHTLCID: idx,
3✔
4090
                htlc: &lnwire.UpdateFulfillHTLC{
3✔
4091
                        PaymentPreimage: pre,
3✔
4092
                },
3✔
4093
        }
3✔
4094

3✔
4095
        // Add the newly discovered preimage to our growing list of uncommitted
3✔
4096
        // preimage. These will be written to the witness cache just before
3✔
4097
        // accepting the next commitment signature from the remote peer.
3✔
4098
        l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
3✔
4099

3✔
4100
        // Pipeline this settle, send it to the switch.
3✔
4101
        go l.forwardBatch(false, settlePacket)
3✔
4102

3✔
4103
        return nil
3✔
4104
}
4105

4106
// processRemoteUpdateFailMalformedHTLC takes an `UpdateFailMalformedHTLC` msg
4107
// sent from the remote and processes it.
4108
func (l *channelLink) processRemoteUpdateFailMalformedHTLC(
4109
        msg *lnwire.UpdateFailMalformedHTLC) error {
3✔
4110

3✔
4111
        // Convert the failure type encoded within the HTLC fail message to the
3✔
4112
        // proper generic lnwire error code.
3✔
4113
        var failure lnwire.FailureMessage
3✔
4114
        switch msg.FailureCode {
3✔
4115
        case lnwire.CodeInvalidOnionVersion:
3✔
4116
                failure = &lnwire.FailInvalidOnionVersion{
3✔
4117
                        OnionSHA256: msg.ShaOnionBlob,
3✔
4118
                }
3✔
NEW
4119
        case lnwire.CodeInvalidOnionHmac:
×
NEW
4120
                failure = &lnwire.FailInvalidOnionHmac{
×
NEW
4121
                        OnionSHA256: msg.ShaOnionBlob,
×
NEW
4122
                }
×
4123

NEW
4124
        case lnwire.CodeInvalidOnionKey:
×
NEW
4125
                failure = &lnwire.FailInvalidOnionKey{
×
NEW
4126
                        OnionSHA256: msg.ShaOnionBlob,
×
NEW
4127
                }
×
4128

4129
        // Handle malformed errors that are part of a blinded route. This case
4130
        // is slightly different, because we expect every relaying node in the
4131
        // blinded portion of the route to send malformed errors. If we're also
4132
        // a relaying node, we're likely going to switch this error out anyway
4133
        // for our own malformed error, but we handle the case here for
4134
        // completeness.
4135
        case lnwire.CodeInvalidBlinding:
3✔
4136
                failure = &lnwire.FailInvalidBlinding{
3✔
4137
                        OnionSHA256: msg.ShaOnionBlob,
3✔
4138
                }
3✔
4139

NEW
4140
        default:
×
NEW
4141
                l.log.Warnf("unexpected failure code received in "+
×
NEW
4142
                        "UpdateFailMailformedHTLC: %v", msg.FailureCode)
×
NEW
4143

×
NEW
4144
                // We don't just pass back the error we received from our
×
NEW
4145
                // successor. Otherwise we might report a failure that penalizes
×
NEW
4146
                // us more than needed. If the onion that we forwarded was
×
NEW
4147
                // correct, the node should have been able to send back its own
×
NEW
4148
                // failure. The node did not send back its own failure, so we
×
NEW
4149
                // assume there was a problem with the onion and report that
×
NEW
4150
                // back. We reuse the invalid onion key failure because there is
×
NEW
4151
                // no specific error for this case.
×
NEW
4152
                failure = &lnwire.FailInvalidOnionKey{
×
NEW
4153
                        OnionSHA256: msg.ShaOnionBlob,
×
NEW
4154
                }
×
4155
        }
4156

4157
        // With the error parsed, we'll convert the into it's opaque form.
4158
        var b bytes.Buffer
3✔
4159
        if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
3✔
NEW
4160
                return fmt.Errorf("unable to encode malformed error: %w", err)
×
NEW
4161
        }
×
4162

4163
        // If remote side have been unable to parse the onion blob we have sent
4164
        // to it, than we should transform the malformed HTLC message to the
4165
        // usual HTLC fail message.
4166
        err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
3✔
4167
        if err != nil {
3✔
NEW
4168
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
NEW
4169
                        "unable to handle upstream fail HTLC: %v", err)
×
NEW
4170

×
NEW
4171
                return err
×
NEW
4172
        }
×
4173

4174
        return nil
3✔
4175
}
4176

4177
// processRemoteUpdateFailHTLC takes an `UpdateFailHTLC` msg sent from the
4178
// remote and processes it.
4179
func (l *channelLink) processRemoteUpdateFailHTLC(
4180
        msg *lnwire.UpdateFailHTLC) error {
3✔
4181

3✔
4182
        // Verify that the failure reason is at least 256 bytes plus overhead.
3✔
4183
        const minimumFailReasonLength = lnwire.FailureMessageLength + 2 + 2 + 32
3✔
4184

3✔
4185
        if len(msg.Reason) < minimumFailReasonLength {
3✔
NEW
4186
                // We've received a reason with a non-compliant length. Older
×
NEW
4187
                // nodes happily relay back these failures that may originate
×
NEW
4188
                // from a node further downstream. Therefore we can't just fail
×
NEW
4189
                // the channel.
×
NEW
4190
                //
×
NEW
4191
                // We want to be compliant ourselves, so we also can't pass back
×
NEW
4192
                // the reason unmodified. And we must make sure that we don't
×
NEW
4193
                // hit the magic length check of 260 bytes in
×
NEW
4194
                // processRemoteSettleFails either.
×
NEW
4195
                //
×
NEW
4196
                // Because the reason is unreadable for the payer anyway, we
×
NEW
4197
                // just replace it by a compliant-length series of random bytes.
×
NEW
4198
                msg.Reason = make([]byte, minimumFailReasonLength)
×
NEW
4199
                _, err := crand.Read(msg.Reason[:])
×
NEW
4200
                if err != nil {
×
NEW
4201
                        return fmt.Errorf("random generation error: %w", err)
×
NEW
4202
                }
×
4203
        }
4204

4205
        // Add fail to the update log.
4206
        idx := msg.ID
3✔
4207
        err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
3✔
4208
        if err != nil {
3✔
NEW
4209
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
NEW
4210
                        "unable to handle upstream fail HTLC: %v", err)
×
NEW
4211

×
NEW
4212
                return err
×
NEW
4213
        }
×
4214

4215
        return nil
3✔
4216
}
4217

4218
// processRemoteCommitSig takes a `CommitSig` msg sent from the remote and
4219
// processes it.
4220
func (l *channelLink) processRemoteCommitSig(ctx context.Context,
4221
        msg *lnwire.CommitSig) error {
3✔
4222

3✔
4223
        // Since we may have learned new preimages for the first time, we'll add
3✔
4224
        // them to our preimage cache. By doing this, we ensure any contested
3✔
4225
        // contracts watched by any on-chain arbitrators can now sweep this HTLC
3✔
4226
        // on-chain. We delay committing the preimages until just before
3✔
4227
        // accepting the new remote commitment, as afterwards the peer won't
3✔
4228
        // resend the Settle messages on the next channel reestablishment. Doing
3✔
4229
        // so allows us to more effectively batch this operation, instead of
3✔
4230
        // doing a single write per preimage.
3✔
4231
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
3✔
4232
        if err != nil {
3✔
NEW
4233
                l.failf(
×
NEW
4234
                        LinkFailureError{code: ErrInternalError},
×
NEW
4235
                        "unable to add preimages=%v to cache: %v",
×
NEW
4236
                        l.uncommittedPreimages, err,
×
NEW
4237
                )
×
NEW
4238

×
NEW
4239
                return err
×
NEW
4240
        }
×
4241

4242
        // Instead of truncating the slice to conserve memory allocations, we
4243
        // simply set the uncommitted preimage slice to nil so that a new one
4244
        // will be initialized if any more witnesses are discovered. We do this
4245
        // because the maximum size that the slice can occupy is 15KB, and we
4246
        // want to ensure we release that memory back to the runtime.
4247
        l.uncommittedPreimages = nil
3✔
4248

3✔
4249
        // We just received a new updates to our local commitment chain,
3✔
4250
        // validate this new commitment, closing the link if invalid.
3✔
4251
        auxSigBlob, err := msg.CustomRecords.Serialize()
3✔
4252
        if err != nil {
3✔
NEW
4253
                l.failf(
×
NEW
4254
                        LinkFailureError{code: ErrInvalidCommitment},
×
NEW
4255
                        "unable to serialize custom records: %v", err,
×
NEW
4256
                )
×
NEW
4257

×
NEW
4258
                return err
×
NEW
4259
        }
×
4260
        err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
3✔
4261
                CommitSig:  msg.CommitSig,
3✔
4262
                HtlcSigs:   msg.HtlcSigs,
3✔
4263
                PartialSig: msg.PartialSig,
3✔
4264
                AuxSigBlob: auxSigBlob,
3✔
4265
        })
3✔
4266
        if err != nil {
3✔
NEW
4267
                // If we were unable to reconstruct their proposed commitment,
×
NEW
4268
                // then we'll examine the type of error. If it's an
×
NEW
4269
                // InvalidCommitSigError, then we'll send a direct error.
×
NEW
4270
                var sendData []byte
×
NEW
4271
                switch {
×
NEW
4272
                case lnutils.ErrorAs[*lnwallet.InvalidCommitSigError](err):
×
NEW
4273
                        sendData = []byte(err.Error())
×
NEW
4274
                case lnutils.ErrorAs[*lnwallet.InvalidHtlcSigError](err):
×
NEW
4275
                        sendData = []byte(err.Error())
×
4276
                }
NEW
4277
                l.failf(
×
NEW
4278
                        LinkFailureError{
×
NEW
4279
                                code:          ErrInvalidCommitment,
×
NEW
4280
                                FailureAction: LinkFailureForceClose,
×
NEW
4281
                                SendData:      sendData,
×
NEW
4282
                        },
×
NEW
4283
                        "ChannelPoint(%v): unable to accept new "+
×
NEW
4284
                                "commitment: %v",
×
NEW
4285
                        l.channel.ChannelPoint(), err,
×
NEW
4286
                )
×
NEW
4287

×
NEW
4288
                return err
×
4289
        }
4290

4291
        // As we've just accepted a new state, we'll now immediately send the
4292
        // remote peer a revocation for our prior state.
4293
        nextRevocation, currentHtlcs, finalHTLCs, err :=
3✔
4294
                l.channel.RevokeCurrentCommitment()
3✔
4295
        if err != nil {
3✔
NEW
4296
                l.log.Errorf("unable to revoke commitment: %v", err)
×
NEW
4297

×
NEW
4298
                // We need to fail the channel in case revoking our local
×
NEW
4299
                // commitment does not succeed. We might have already advanced
×
NEW
4300
                // our channel state which would lead us to proceed with an
×
NEW
4301
                // unclean state.
×
NEW
4302
                //
×
NEW
4303
                // NOTE: We do not trigger a force close because this could
×
NEW
4304
                // resolve itself in case our db was just busy not accepting new
×
NEW
4305
                // transactions.
×
NEW
4306
                l.failf(
×
NEW
4307
                        LinkFailureError{
×
NEW
4308
                                code:          ErrInternalError,
×
NEW
4309
                                Warning:       true,
×
NEW
4310
                                FailureAction: LinkFailureDisconnect,
×
NEW
4311
                        },
×
NEW
4312
                        "ChannelPoint(%v): unable to accept new "+
×
NEW
4313
                                "commitment: %v",
×
NEW
4314
                        l.channel.ChannelPoint(), err,
×
NEW
4315
                )
×
NEW
4316

×
NEW
4317
                return err
×
NEW
4318
        }
×
4319

4320
        // As soon as we are ready to send our next revocation, we can invoke
4321
        // the incoming commit hooks.
4322
        l.Lock()
3✔
4323
        l.incomingCommitHooks.invoke()
3✔
4324
        l.Unlock()
3✔
4325

3✔
4326
        err = l.cfg.Peer.SendMessage(false, nextRevocation)
3✔
4327
        if err != nil {
3✔
NEW
4328
                l.log.Errorf("failed to send RevokeAndAck: %v", err)
×
NEW
4329
        }
×
4330

4331
        // Notify the incoming htlcs of which the resolutions were locked in.
4332
        for id, settled := range finalHTLCs {
6✔
4333
                l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
3✔
4334
                        models.CircuitKey{
3✔
4335
                                ChanID: l.ShortChanID(),
3✔
4336
                                HtlcID: id,
3✔
4337
                        },
3✔
4338
                        channeldb.FinalHtlcInfo{
3✔
4339
                                Settled:  settled,
3✔
4340
                                Offchain: true,
3✔
4341
                        },
3✔
4342
                )
3✔
4343
        }
3✔
4344

4345
        // Since we just revoked our commitment, we may have a new set of HTLC's
4346
        // on our commitment, so we'll send them using our function closure
4347
        // NotifyContractUpdate.
4348
        newUpdate := &contractcourt.ContractUpdate{
3✔
4349
                HtlcKey: contractcourt.LocalHtlcSet,
3✔
4350
                Htlcs:   currentHtlcs,
3✔
4351
        }
3✔
4352
        err = l.cfg.NotifyContractUpdate(newUpdate)
3✔
4353
        if err != nil {
3✔
NEW
4354
                return fmt.Errorf("unable to notify contract update: %w", err)
×
NEW
4355
        }
×
4356

4357
        select {
3✔
NEW
4358
        case <-l.cg.Done():
×
NEW
4359
                return nil
×
4360
        default:
3✔
4361
        }
4362

4363
        // If the remote party initiated the state transition, we'll reply with
4364
        // a signature to provide them with their version of the latest
4365
        // commitment. Otherwise, both commitment chains are fully synced from
4366
        // our PoV, then we don't need to reply with a signature as both sides
4367
        // already have a commitment with the latest accepted.
4368
        if l.channel.OweCommitment() {
6✔
4369
                if !l.updateCommitTxOrFail(ctx) {
3✔
NEW
4370
                        return nil
×
NEW
4371
                }
×
4372
        }
4373

4374
        // If we need to send out an Stfu, this would be the time to do so.
4375
        if l.noDanglingUpdates(lntypes.Local) {
6✔
4376
                err = l.quiescer.SendOwedStfu()
3✔
4377
                if err != nil {
3✔
NEW
4378
                        l.stfuFailf("sendOwedStfu: %v", err.Error())
×
NEW
4379
                }
×
4380
        }
4381

4382
        // Now that we have finished processing the incoming CommitSig and sent
4383
        // out our RevokeAndAck, we invoke the flushHooks if the channel state
4384
        // is clean.
4385
        l.Lock()
3✔
4386
        if l.channel.IsChannelClean() {
6✔
4387
                l.flushHooks.invoke()
3✔
4388
        }
3✔
4389
        l.Unlock()
3✔
4390

3✔
4391
        return nil
3✔
4392
}
4393

4394
// processRemoteRevokeAndAck takes a `RevokeAndAck` msg sent from the remote and
4395
// processes it.
4396
func (l *channelLink) processRemoteRevokeAndAck(ctx context.Context,
4397
        msg *lnwire.RevokeAndAck) error {
3✔
4398

3✔
4399
        // We've received a revocation from the remote chain, if valid, this
3✔
4400
        // moves the remote chain forward, and expands our revocation window.
3✔
4401

3✔
4402
        // We now process the message and advance our remote commit chain.
3✔
4403
        fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
3✔
4404
        if err != nil {
3✔
NEW
4405
                // TODO(halseth): force close?
×
NEW
4406
                l.failf(
×
NEW
4407
                        LinkFailureError{
×
NEW
4408
                                code:          ErrInvalidRevocation,
×
NEW
4409
                                FailureAction: LinkFailureDisconnect,
×
NEW
4410
                        },
×
NEW
4411
                        "unable to accept revocation: %v", err,
×
NEW
4412
                )
×
NEW
4413

×
NEW
4414
                return err
×
NEW
4415
        }
×
4416

4417
        // The remote party now has a new primary commitment, so we'll update
4418
        // the contract court to be aware of this new set (the prior old remote
4419
        // pending).
4420
        newUpdate := &contractcourt.ContractUpdate{
3✔
4421
                HtlcKey: contractcourt.RemoteHtlcSet,
3✔
4422
                Htlcs:   remoteHTLCs,
3✔
4423
        }
3✔
4424
        err = l.cfg.NotifyContractUpdate(newUpdate)
3✔
4425
        if err != nil {
3✔
NEW
4426
                return fmt.Errorf("unable to notify contract update: %w", err)
×
NEW
4427
        }
×
4428

4429
        select {
3✔
NEW
4430
        case <-l.cg.Done():
×
NEW
4431
                return nil
×
4432
        default:
3✔
4433
        }
4434

4435
        // If we have a tower client for this channel type, we'll create a
4436
        // backup for the current state.
4437
        if l.cfg.TowerClient != nil {
6✔
4438
                state := l.channel.State()
3✔
4439
                chanID := l.ChanID()
3✔
4440

3✔
4441
                err = l.cfg.TowerClient.BackupState(
3✔
4442
                        &chanID, state.RemoteCommitment.CommitHeight-1,
3✔
4443
                )
3✔
4444
                if err != nil {
3✔
NEW
4445
                        l.failf(LinkFailureError{
×
NEW
4446
                                code: ErrInternalError,
×
NEW
4447
                        }, "unable to queue breach backup: %v", err)
×
NEW
4448

×
NEW
4449
                        return err
×
NEW
4450
                }
×
4451
        }
4452

4453
        // If we can send updates then we can process adds in case we are the
4454
        // exit hop and need to send back resolutions, or in case there are
4455
        // validity issues with the packets. Otherwise we defer the action until
4456
        // resume.
4457
        //
4458
        // We are free to process the settles and fails without this check since
4459
        // processing those can't result in further updates to this channel
4460
        // link.
4461
        if l.quiescer.CanSendUpdates() {
6✔
4462
                l.processRemoteAdds(fwdPkg)
3✔
4463
        } else {
3✔
NEW
4464
                l.quiescer.OnResume(func() {
×
NEW
4465
                        l.processRemoteAdds(fwdPkg)
×
NEW
4466
                })
×
4467
        }
4468
        l.processRemoteSettleFails(fwdPkg)
3✔
4469

3✔
4470
        // If the link failed during processing the adds, we must return to
3✔
4471
        // ensure we won't attempted to update the state further.
3✔
4472
        if l.failed {
3✔
NEW
4473
                return nil
×
NEW
4474
        }
×
4475

4476
        // The revocation window opened up. If there are pending local updates,
4477
        // try to update the commit tx. Pending updates could already have been
4478
        // present because of a previously failed update to the commit tx or
4479
        // freshly added in by processRemoteAdds. Also in case there are no
4480
        // local updates, but there are still remote updates that are not in the
4481
        // remote commit tx yet, send out an update.
4482
        if l.channel.OweCommitment() {
6✔
4483
                if !l.updateCommitTxOrFail(ctx) {
3✔
NEW
4484
                        return nil
×
NEW
4485
                }
×
4486
        }
4487

4488
        // Now that we have finished processing the RevokeAndAck, we can invoke
4489
        // the flushHooks if the channel state is clean.
4490
        l.Lock()
3✔
4491
        if l.channel.IsChannelClean() {
6✔
4492
                l.flushHooks.invoke()
3✔
4493
        }
3✔
4494
        l.Unlock()
3✔
4495

3✔
4496
        return nil
3✔
4497
}
4498

4499
// processRemoteUpdateFee takes an `UpdateFee` msg sent from the remote and
4500
// processes it.
NEW
4501
func (l *channelLink) processRemoteUpdateFee(msg *lnwire.UpdateFee) error {
×
NEW
4502
        // Check and see if their proposed fee-rate would make us exceed the fee
×
NEW
4503
        // threshold.
×
NEW
4504
        fee := chainfee.SatPerKWeight(msg.FeePerKw)
×
NEW
4505

×
NEW
4506
        isDust, err := l.exceedsFeeExposureLimit(fee)
×
NEW
4507
        if err != nil {
×
NEW
4508
                // This shouldn't typically happen. If it does, it indicates
×
NEW
4509
                // something is wrong with our channel state.
×
NEW
4510
                l.log.Errorf("Unable to determine if fee threshold " +
×
NEW
4511
                        "exceeded")
×
NEW
4512
                l.failf(LinkFailureError{code: ErrInternalError},
×
NEW
4513
                        "error calculating fee exposure: %v", err)
×
NEW
4514

×
NEW
4515
                return err
×
NEW
4516
        }
×
4517

NEW
4518
        if isDust {
×
NEW
4519
                // The proposed fee-rate makes us exceed the fee threshold.
×
NEW
4520
                l.failf(LinkFailureError{code: ErrInternalError},
×
NEW
4521
                        "fee threshold exceeded: %v", err)
×
NEW
4522
                return err
×
NEW
4523
        }
×
4524

4525
        // We received fee update from peer. If we are the initiator we will
4526
        // fail the channel, if not we will apply the update.
NEW
4527
        if err := l.channel.ReceiveUpdateFee(fee); err != nil {
×
NEW
4528
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
NEW
4529
                        "error receiving fee update: %v", err)
×
NEW
4530
                return err
×
NEW
4531
        }
×
4532

4533
        // Update the mailbox's feerate as well.
NEW
4534
        l.mailBox.SetFeeRate(fee)
×
NEW
4535

×
NEW
4536
        return nil
×
4537
}
4538

4539
// processRemoteError takes an `Error` msg sent from the remote and fails the
4540
// channel link.
4541
func (l *channelLink) processRemoteError(msg *lnwire.Error) {
2✔
4542
        // Error received from remote, MUST fail channel, but should only print
2✔
4543
        // the contents of the error message if all characters are printable
2✔
4544
        // ASCII.
2✔
4545
        l.failf(
2✔
4546
                // TODO(halseth): we currently don't fail the channel
2✔
4547
                // permanently, as there are some sync issues with other
2✔
4548
                // implementations that will lead to them sending an
2✔
4549
                // error message, but we can recover from on next
2✔
4550
                // connection. See
2✔
4551
                // https://github.com/ElementsProject/lightning/issues/4212
2✔
4552
                LinkFailureError{
2✔
4553
                        code:             ErrRemoteError,
2✔
4554
                        PermanentFailure: false,
2✔
4555
                },
2✔
4556
                "ChannelPoint(%v): received error from peer: %v",
2✔
4557
                l.channel.ChannelPoint(), msg.Error(),
2✔
4558
        )
2✔
4559
}
2✔
4560

4561
// processLocalUpdateFulfillHTLC takes an `UpdateFulfillHTLC` from the local and
4562
// processes it.
4563
func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context,
4564
        pkt *htlcPacket, htlc *lnwire.UpdateFulfillHTLC) {
3✔
4565

3✔
4566
        // If hodl.SettleOutgoing mode is active, we exit early to simulate
3✔
4567
        // arbitrary delays between the switch adding the SETTLE to the mailbox,
3✔
4568
        // and the HTLC being added to the commitment state.
3✔
4569
        if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
3✔
NEW
4570
                l.log.Warnf(hodl.SettleOutgoing.Warning())
×
NEW
4571
                l.mailBox.AckPacket(pkt.inKey())
×
NEW
4572

×
NEW
4573
                return
×
NEW
4574
        }
×
4575

4576
        // An HTLC we forward to the switch has just settled somewhere upstream.
4577
        // Therefore we settle the HTLC within the our local state machine.
4578
        inKey := pkt.inKey()
3✔
4579
        err := l.channel.SettleHTLC(
3✔
4580
                htlc.PaymentPreimage, pkt.incomingHTLCID, pkt.sourceRef,
3✔
4581
                pkt.destRef, &inKey,
3✔
4582
        )
3✔
4583
        if err != nil {
3✔
NEW
4584
                l.log.Errorf("unable to settle incoming HTLC for "+
×
NEW
4585
                        "circuit-key=%v: %v", inKey, err)
×
NEW
4586

×
NEW
4587
                // If the HTLC index for Settle response was not known to our
×
NEW
4588
                // commitment state, it has already been cleaned up by a prior
×
NEW
4589
                // response. We'll thus try to clean up any lingering state to
×
NEW
4590
                // ensure we don't continue reforwarding.
×
NEW
4591
                if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
×
NEW
4592
                        l.cleanupSpuriousResponse(pkt)
×
NEW
4593
                }
×
4594

4595
                // Remove the packet from the link's mailbox to ensure it
4596
                // doesn't get replayed after a reconnection.
NEW
4597
                l.mailBox.AckPacket(inKey)
×
NEW
4598

×
NEW
4599
                return
×
4600
        }
4601

4602
        l.log.Debugf("queueing removal of SETTLE closed circuit: %s->%s",
3✔
4603
                pkt.inKey(), pkt.outKey())
3✔
4604

3✔
4605
        l.closedCircuits = append(l.closedCircuits, pkt.inKey())
3✔
4606

3✔
4607
        // With the HTLC settled, we'll need to populate the wire message to
3✔
4608
        // target the specific channel and HTLC to be canceled.
3✔
4609
        htlc.ChanID = l.ChanID()
3✔
4610
        htlc.ID = pkt.incomingHTLCID
3✔
4611

3✔
4612
        // Then we send the HTLC settle message to the connected peer so we can
3✔
4613
        // continue the propagation of the settle message.
3✔
4614
        err = l.cfg.Peer.SendMessage(false, htlc)
3✔
4615
        if err != nil {
3✔
NEW
4616
                l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
×
NEW
4617
        }
×
4618

4619
        // Send a settle event notification to htlcNotifier.
4620
        l.cfg.HtlcNotifier.NotifySettleEvent(
3✔
4621
                newHtlcKey(pkt), htlc.PaymentPreimage, getEventType(pkt),
3✔
4622
        )
3✔
4623

3✔
4624
        // Immediately update the commitment tx to minimize latency.
3✔
4625
        l.updateCommitTxOrFail(ctx)
3✔
4626
}
4627

4628
// processLocalUpdateFailHTLC takes an `UpdateFailHTLC` from the local and
4629
// processes it.
4630
func (l *channelLink) processLocalUpdateFailHTLC(ctx context.Context,
4631
        pkt *htlcPacket, htlc *lnwire.UpdateFailHTLC) {
3✔
4632

3✔
4633
        // If hodl.FailOutgoing mode is active, we exit early to simulate
3✔
4634
        // arbitrary delays between the switch adding a FAIL to the mailbox, and
3✔
4635
        // the HTLC being added to the commitment state.
3✔
4636
        if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
3✔
NEW
4637
                l.log.Warnf(hodl.FailOutgoing.Warning())
×
NEW
4638
                l.mailBox.AckPacket(pkt.inKey())
×
NEW
4639

×
NEW
4640
                return
×
NEW
4641
        }
×
4642

4643
        // An HTLC cancellation has been triggered somewhere upstream, we'll
4644
        // remove then HTLC from our local state machine.
4645
        inKey := pkt.inKey()
3✔
4646
        err := l.channel.FailHTLC(
3✔
4647
                pkt.incomingHTLCID, htlc.Reason, pkt.sourceRef, pkt.destRef,
3✔
4648
                &inKey,
3✔
4649
        )
3✔
4650
        if err != nil {
6✔
4651
                l.log.Errorf("unable to cancel incoming HTLC for "+
3✔
4652
                        "circuit-key=%v: %v", inKey, err)
3✔
4653

3✔
4654
                // If the HTLC index for Fail response was not known to our
3✔
4655
                // commitment state, it has already been cleaned up by a prior
3✔
4656
                // response. We'll thus try to clean up any lingering state to
3✔
4657
                // ensure we don't continue reforwarding.
3✔
4658
                if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
3✔
NEW
4659
                        l.cleanupSpuriousResponse(pkt)
×
NEW
4660
                }
×
4661

4662
                // Remove the packet from the link's mailbox to ensure it
4663
                // doesn't get replayed after a reconnection.
4664
                l.mailBox.AckPacket(inKey)
3✔
4665

3✔
4666
                return
3✔
4667
        }
4668

4669
        l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
3✔
4670
                pkt.inKey(), pkt.outKey())
3✔
4671

3✔
4672
        l.closedCircuits = append(l.closedCircuits, pkt.inKey())
3✔
4673

3✔
4674
        // With the HTLC removed, we'll need to populate the wire message to
3✔
4675
        // target the specific channel and HTLC to be canceled. The "Reason"
3✔
4676
        // field will have already been set within the switch.
3✔
4677
        htlc.ChanID = l.ChanID()
3✔
4678
        htlc.ID = pkt.incomingHTLCID
3✔
4679

3✔
4680
        // We send the HTLC message to the peer which initially created the
3✔
4681
        // HTLC. If the incoming blinding point is non-nil, we know that we are
3✔
4682
        // a relaying node in a blinded path. Otherwise, we're either an
3✔
4683
        // introduction node or not part of a blinded path at all.
3✔
4684
        err = l.sendIncomingHTLCFailureMsg(htlc.ID, pkt.obfuscator, htlc.Reason)
3✔
4685
        if err != nil {
3✔
NEW
4686
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
NEW
4687

×
NEW
4688
                return
×
NEW
4689
        }
×
4690

4691
        // If the packet does not have a link failure set, it failed further
4692
        // down the route so we notify a forwarding failure. Otherwise, we
4693
        // notify a link failure because it failed at our node.
4694
        if pkt.linkFailure != nil {
6✔
4695
                l.cfg.HtlcNotifier.NotifyLinkFailEvent(
3✔
4696
                        newHtlcKey(pkt), newHtlcInfo(pkt), getEventType(pkt),
3✔
4697
                        pkt.linkFailure, false,
3✔
4698
                )
3✔
4699
        } else {
6✔
4700
                l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
3✔
4701
                        newHtlcKey(pkt), getEventType(pkt),
3✔
4702
                )
3✔
4703
        }
3✔
4704

4705
        // Immediately update the commitment tx to minimize latency.
4706
        l.updateCommitTxOrFail(ctx)
3✔
4707
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc