• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16673052227

01 Aug 2025 10:44AM UTC coverage: 67.016% (-0.03%) from 67.047%
16673052227

Pull #9888

github

web-flow
Merge 1dd8765d7 into 37523b6cb
Pull Request #9888: Attributable failures

325 of 384 new or added lines in 16 files covered. (84.64%)

131 existing lines in 24 files now uncovered.

135611 of 202355 relevant lines covered (67.02%)

21613.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.75
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        crand "crypto/rand"
7
        "crypto/sha256"
8
        "errors"
9
        "fmt"
10
        prand "math/rand"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcec/v2"
16
        "github.com/btcsuite/btcd/btcutil"
17
        "github.com/btcsuite/btcd/wire"
18
        "github.com/btcsuite/btclog/v2"
19
        sphinx "github.com/lightningnetwork/lightning-onion"
20
        "github.com/lightningnetwork/lnd/channeldb"
21
        "github.com/lightningnetwork/lnd/contractcourt"
22
        "github.com/lightningnetwork/lnd/fn/v2"
23
        "github.com/lightningnetwork/lnd/graph/db/models"
24
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
25
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
26
        "github.com/lightningnetwork/lnd/input"
27
        "github.com/lightningnetwork/lnd/invoices"
28
        "github.com/lightningnetwork/lnd/lnpeer"
29
        "github.com/lightningnetwork/lnd/lntypes"
30
        "github.com/lightningnetwork/lnd/lnutils"
31
        "github.com/lightningnetwork/lnd/lnwallet"
32
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
33
        "github.com/lightningnetwork/lnd/lnwire"
34
        "github.com/lightningnetwork/lnd/queue"
35
        "github.com/lightningnetwork/lnd/record"
36
        "github.com/lightningnetwork/lnd/routing/route"
37
        "github.com/lightningnetwork/lnd/ticker"
38
        "github.com/lightningnetwork/lnd/tlv"
39
)
40

41
const (
42
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
43
        // the node accepts for forwarded payments. The value is relative to the
44
        // current block height. The reason to have a maximum is to prevent
45
        // funds getting locked up unreasonably long. Otherwise, an attacker
46
        // willing to lock its own funds too, could force the funds of this node
47
        // to be locked up for an indefinite (max int32) number of blocks.
48
        //
49
        // The value 2016 corresponds to on average two weeks worth of blocks
50
        // and is based on the maximum number of hops (20), the default CLTV
51
        // delta (40), and some extra margin to account for the other lightning
52
        // implementations and past lnd versions which used to have a default
53
        // CLTV delta of 144.
54
        DefaultMaxOutgoingCltvExpiry = 2016
55

56
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
57
        // which a link should propose to update its commitment fee rate.
58
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
59

60
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
61
        // which a link should propose to update its commitment fee rate.
62
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
63

64
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
65
        // a channel's commitment fee to be of its balance. This only applies to
66
        // the initiator of the channel.
67
        DefaultMaxLinkFeeAllocation float64 = 0.5
68
)
69

70
// ExpectedFee computes the expected fee for a given htlc amount. The value
71
// returned from this function is to be used as a sanity check when forwarding
72
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
73
// forwarding policy.
74
//
75
// TODO(roasbeef): also add in current available channel bandwidth, inverse
76
// func
77
func ExpectedFee(f models.ForwardingPolicy,
78
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
78✔
79

78✔
80
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
78✔
81
}
78✔
82

83
// ChannelLinkConfig defines the configuration for the channel link. ALL
84
// elements within the configuration MUST be non-nil for channel link to carry
85
// out its duties.
86
type ChannelLinkConfig struct {
87
        // FwrdingPolicy is the initial forwarding policy to be used when
88
        // deciding whether to forwarding incoming HTLC's or not. This value
89
        // can be updated with subsequent calls to UpdateForwardingPolicy
90
        // targeted at a given ChannelLink concrete interface implementation.
91
        FwrdingPolicy models.ForwardingPolicy
92

93
        // Circuits provides restricted access to the switch's circuit map,
94
        // allowing the link to open and close circuits.
95
        Circuits CircuitModifier
96

97
        // BestHeight returns the best known height.
98
        BestHeight func() uint32
99

100
        // ForwardPackets attempts to forward the batch of htlcs through the
101
        // switch. The function returns and error in case it fails to send one or
102
        // more packets. The link's quit signal should be provided to allow
103
        // cancellation of forwarding during link shutdown.
104
        ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
105

106
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
107
        // blobs, which are then used to inform how to forward an HTLC.
108
        //
109
        // NOTE: This function assumes the same set of readers and preimages
110
        // are always presented for the same identifier. The last boolean is
111
        // used to decide whether this is a reforwarding or not - when it's
112
        // reforwarding, we skip the replay check enforced in our decay log.
113
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) (
114
                []hop.DecodeHopIteratorResponse, error)
115

116
        // ExtractSharedSecret function is responsible for decoding HTLC
117
        // Sphinx onion blob, and deriving the shared secret.
118
        ExtractSharedSecret hop.SharedSecretGenerator
119

120
        // CreateErrorEncrypter instantiates an error encrypter based on the
121
        // provided encryption parameters.
122
        CreateErrorEncrypter func(ephemeralKey *btcec.PublicKey,
123
                sharedSecret sphinx.Hash256, isIntroduction,
124
                hasBlindingPoint bool) hop.ErrorEncrypter
125

126
        // FetchLastChannelUpdate retrieves the latest routing policy for a
127
        // target channel. This channel will typically be the outgoing channel
128
        // specified when we receive an incoming HTLC.  This will be used to
129
        // provide payment senders our latest policy when sending encrypted
130
        // error messages.
131
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
132
                *lnwire.ChannelUpdate1, error)
133

134
        // Peer is a lightning network node with which we have the channel link
135
        // opened.
136
        Peer lnpeer.Peer
137

138
        // Registry is a sub-system which responsible for managing the invoices
139
        // in thread-safe manner.
140
        Registry InvoiceDatabase
141

142
        // PreimageCache is a global witness beacon that houses any new
143
        // preimages discovered by other links. We'll use this to add new
144
        // witnesses that we discover which will notify any sub-systems
145
        // subscribed to new events.
146
        PreimageCache contractcourt.WitnessBeacon
147

148
        // OnChannelFailure is a function closure that we'll call if the
149
        // channel failed for some reason. Depending on the severity of the
150
        // error, the closure potentially must force close this channel and
151
        // disconnect the peer.
152
        //
153
        // NOTE: The method must return in order for the ChannelLink to be able
154
        // to shut down properly.
155
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
156
                LinkFailureError)
157

158
        // UpdateContractSignals is a function closure that we'll use to update
159
        // outside sub-systems with this channel's latest ShortChannelID.
160
        UpdateContractSignals func(*contractcourt.ContractSignals) error
161

162
        // NotifyContractUpdate is a function closure that we'll use to update
163
        // the contractcourt and more specifically the ChannelArbitrator of the
164
        // latest channel state.
165
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
166

167
        // ChainEvents is an active subscription to the chain watcher for this
168
        // channel to be notified of any on-chain activity related to this
169
        // channel.
170
        ChainEvents *contractcourt.ChainEventSubscription
171

172
        // FeeEstimator is an instance of a live fee estimator which will be
173
        // used to dynamically regulate the current fee of the commitment
174
        // transaction to ensure timely confirmation.
175
        FeeEstimator chainfee.Estimator
176

177
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
178
        // for HTLC forwarding internal to the switch.
179
        //
180
        // NOTE: This should only be used for testing.
181
        HodlMask hodl.Mask
182

183
        // SyncStates is used to indicate that we need send the channel
184
        // reestablishment message to the remote peer. It should be done if our
185
        // clients have been restarted, or remote peer have been reconnected.
186
        SyncStates bool
187

188
        // BatchTicker is the ticker that determines the interval that we'll
189
        // use to check the batch to see if there're any updates we should
190
        // flush out. By batching updates into a single commit, we attempt to
191
        // increase throughput by maximizing the number of updates coalesced
192
        // into a single commit.
193
        BatchTicker ticker.Ticker
194

195
        // FwdPkgGCTicker is the ticker determining the frequency at which
196
        // garbage collection of forwarding packages occurs. We use a
197
        // time-based approach, as opposed to block epochs, as to not hinder
198
        // syncing.
199
        FwdPkgGCTicker ticker.Ticker
200

201
        // PendingCommitTicker is a ticker that allows the link to determine if
202
        // a locally initiated commitment dance gets stuck waiting for the
203
        // remote party to revoke.
204
        PendingCommitTicker ticker.Ticker
205

206
        // BatchSize is the max size of a batch of updates done to the link
207
        // before we do a state update.
208
        BatchSize uint32
209

210
        // UnsafeReplay will cause a link to replay the adds in its latest
211
        // commitment txn after the link is restarted. This should only be used
212
        // in testing, it is here to ensure the sphinx replay detection on the
213
        // receiving node is persistent.
214
        UnsafeReplay bool
215

216
        // MinUpdateTimeout represents the minimum interval in which a link
217
        // will propose to update its commitment fee rate. A random timeout will
218
        // be selected between this and MaxUpdateTimeout.
219
        MinUpdateTimeout time.Duration
220

221
        // MaxUpdateTimeout represents the maximum interval in which a link
222
        // will propose to update its commitment fee rate. A random timeout will
223
        // be selected between this and MinUpdateTimeout.
224
        MaxUpdateTimeout time.Duration
225

226
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
227
        // an htlc where we don't offer an htlc anymore. This should be at least
228
        // the outgoing broadcast delta, because in any case we don't want to
229
        // risk offering an htlc that triggers channel closure.
230
        OutgoingCltvRejectDelta uint32
231

232
        // TowerClient is an optional engine that manages the signing,
233
        // encrypting, and uploading of justice transactions to the daemon's
234
        // configured set of watchtowers for legacy channels.
235
        TowerClient TowerClient
236

237
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
238
        // should accept for a forwarded HTLC. The value is relative to the
239
        // current block height.
240
        MaxOutgoingCltvExpiry uint32
241

242
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
243
        // commitment fee to be of its balance. This only applies to the
244
        // initiator of the channel.
245
        MaxFeeAllocation float64
246

247
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
248
        // the initiator for channels of the anchor type.
249
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
250

251
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
252
        // link is first started.
253
        NotifyActiveLink func(wire.OutPoint)
254

255
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
256
        // channels becomes active.
257
        NotifyActiveChannel func(wire.OutPoint)
258

259
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
260
        // when channels become inactive.
261
        NotifyInactiveChannel func(wire.OutPoint)
262

263
        // NotifyInactiveLinkEvent allows the switch to tell the
264
        // ChannelNotifier when a channel link become inactive.
265
        NotifyInactiveLinkEvent func(wire.OutPoint)
266

267
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
268
        // events through.
269
        HtlcNotifier htlcNotifier
270

271
        // FailAliasUpdate is a function used to fail an HTLC for an
272
        // option_scid_alias channel.
273
        FailAliasUpdate func(sid lnwire.ShortChannelID,
274
                incoming bool) *lnwire.ChannelUpdate1
275

276
        // GetAliases is used by the link and switch to fetch the set of
277
        // aliases for a given link.
278
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
279

280
        // PreviouslySentShutdown is an optional value that is set if, at the
281
        // time of the link being started, persisted shutdown info was found for
282
        // the channel. This value being set means that we previously sent a
283
        // Shutdown message to our peer, and so we should do so again on
284
        // re-establish and should not allow anymore HTLC adds on the outgoing
285
        // direction of the link.
286
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
287

288
        // Adds the option to disable forwarding payments in blinded routes
289
        // by failing back any blinding-related payloads as if they were
290
        // invalid.
291
        DisallowRouteBlinding bool
292

293
        // DisallowQuiescence is a flag that can be used to disable the
294
        // quiescence protocol.
295
        DisallowQuiescence bool
296

297
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
298
        // restrict the flow of HTLCs and fee updates.
299
        MaxFeeExposure lnwire.MilliSatoshi
300

301
        // ShouldFwdExpEndorsement is a closure that indicates whether the link
302
        // should forward experimental endorsement signals.
303
        ShouldFwdExpEndorsement func() bool
304

305
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
306
        // used to manage the bandwidth of the link.
307
        AuxTrafficShaper fn.Option[AuxTrafficShaper]
308

309
        // QuiescenceTimeout is the max duration that the channel can be
310
        // quiesced. Any dependent protocols (dynamic commitments, splicing,
311
        // etc.) must finish their operations under this timeout value,
312
        // otherwise the node will disconnect.
313
        QuiescenceTimeout time.Duration
314
}
315

316
// channelLink is the service which drives a channel's commitment update
317
// state-machine. In the event that an HTLC needs to be propagated to another
318
// link, the forward handler from config is used which sends HTLC to the
319
// switch. Additionally, the link encapsulate logic of commitment protocol
320
// message ordering and updates.
321
type channelLink struct {
322
        // The following fields are only meant to be used *atomically*
323
        started       int32
324
        reestablished int32
325
        shutdown      int32
326

327
        // failed should be set to true in case a link error happens, making
328
        // sure we don't process any more updates.
329
        failed bool
330

331
        // keystoneBatch represents a volatile list of keystones that must be
332
        // written before attempting to sign the next commitment txn. These
333
        // represent all the HTLC's forwarded to the link from the switch. Once
334
        // we lock them into our outgoing commitment, then the circuit has a
335
        // keystone, and is fully opened.
336
        keystoneBatch []Keystone
337

338
        // openedCircuits is the set of all payment circuits that will be open
339
        // once we make our next commitment. After making the commitment we'll
340
        // ACK all these from our mailbox to ensure that they don't get
341
        // re-delivered if we reconnect.
342
        openedCircuits []CircuitKey
343

344
        // closedCircuits is the set of all payment circuits that will be
345
        // closed once we make our next commitment. After taking the commitment
346
        // we'll ACK all these to ensure that they don't get re-delivered if we
347
        // reconnect.
348
        closedCircuits []CircuitKey
349

350
        // channel is a lightning network channel to which we apply htlc
351
        // updates.
352
        channel *lnwallet.LightningChannel
353

354
        // cfg is a structure which carries all dependable fields/handlers
355
        // which may affect behaviour of the service.
356
        cfg ChannelLinkConfig
357

358
        // mailBox is the main interface between the outside world and the
359
        // link. All incoming messages will be sent over this mailBox. Messages
360
        // include new updates from our connected peer, and new packets to be
361
        // forwarded sent by the switch.
362
        mailBox MailBox
363

364
        // upstream is a channel that new messages sent from the remote peer to
365
        // the local peer will be sent across.
366
        upstream chan lnwire.Message
367

368
        // downstream is a channel in which new multi-hop HTLC's to be
369
        // forwarded will be sent across. Messages from this channel are sent
370
        // by the HTLC switch.
371
        downstream chan *htlcPacket
372

373
        // updateFeeTimer is the timer responsible for updating the link's
374
        // commitment fee every time it fires.
375
        updateFeeTimer *time.Timer
376

377
        // uncommittedPreimages stores a list of all preimages that have been
378
        // learned since receiving the last CommitSig from the remote peer. The
379
        // batch will be flushed just before accepting the subsequent CommitSig
380
        // or on shutdown to avoid doing a write for each preimage received.
381
        uncommittedPreimages []lntypes.Preimage
382

383
        sync.RWMutex
384

385
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
386
        // registry.
387
        hodlQueue *queue.ConcurrentQueue
388

389
        // hodlMap stores related htlc data for a circuit key. It allows
390
        // resolving those htlcs when we receive a message on hodlQueue.
391
        hodlMap map[models.CircuitKey]hodlHtlc
392

393
        // log is a link-specific logging instance.
394
        log btclog.Logger
395

396
        // isOutgoingAddBlocked tracks whether the channelLink can send an
397
        // UpdateAddHTLC.
398
        isOutgoingAddBlocked atomic.Bool
399

400
        // isIncomingAddBlocked tracks whether the channelLink can receive an
401
        // UpdateAddHTLC.
402
        isIncomingAddBlocked atomic.Bool
403

404
        // flushHooks is a hookMap that is triggered when we reach a channel
405
        // state with no live HTLCs.
406
        flushHooks hookMap
407

408
        // outgoingCommitHooks is a hookMap that is triggered after we send our
409
        // next CommitSig.
410
        outgoingCommitHooks hookMap
411

412
        // incomingCommitHooks is a hookMap that is triggered after we receive
413
        // our next CommitSig.
414
        incomingCommitHooks hookMap
415

416
        // quiescer is the state machine that tracks where this channel is with
417
        // respect to the quiescence protocol.
418
        quiescer Quiescer
419

420
        // quiescenceReqs is a queue of requests to quiesce this link. The
421
        // members of the queue are send-only channels we should call back with
422
        // the result.
423
        quiescenceReqs chan StfuReq
424

425
        // cg is a helper that encapsulates a wait group and quit channel and
426
        // allows contexts that either block or cancel on those depending on
427
        // the use case.
428
        cg *fn.ContextGuard
429
}
430

431
// hookMap is a data structure that is used to track the hooks that need to be
432
// called in various parts of the channelLink's lifecycle.
433
//
434
// WARNING: NOT thread-safe.
435
type hookMap struct {
436
        // allocIdx keeps track of the next id we haven't yet allocated.
437
        allocIdx atomic.Uint64
438

439
        // transient is a map of hooks that are only called the next time invoke
440
        // is called. These hooks are deleted during invoke.
441
        transient map[uint64]func()
442

443
        // newTransients is a channel that we use to accept new hooks into the
444
        // hookMap.
445
        newTransients chan func()
446
}
447

448
// newHookMap initializes a new empty hookMap.
449
func newHookMap() hookMap {
648✔
450
        return hookMap{
648✔
451
                allocIdx:      atomic.Uint64{},
648✔
452
                transient:     make(map[uint64]func()),
648✔
453
                newTransients: make(chan func()),
648✔
454
        }
648✔
455
}
648✔
456

457
// alloc allocates space in the hook map for the supplied hook, the second
458
// argument determines whether it goes into the transient or persistent part
459
// of the hookMap.
460
func (m *hookMap) alloc(hook func()) uint64 {
5✔
461
        // We assume we never overflow a uint64. Seems OK.
5✔
462
        hookID := m.allocIdx.Add(1)
5✔
463
        if hookID == 0 {
5✔
464
                panic("hookMap allocIdx overflow")
×
465
        }
466
        m.transient[hookID] = hook
5✔
467

5✔
468
        return hookID
5✔
469
}
470

471
// invoke is used on a hook map to call all the registered hooks and then clear
472
// out the transient hooks so they are not called again.
473
func (m *hookMap) invoke() {
2,669✔
474
        for _, hook := range m.transient {
2,674✔
475
                hook()
5✔
476
        }
5✔
477

478
        m.transient = make(map[uint64]func())
2,669✔
479
}
480

481
// hodlHtlc contains htlc data that is required for resolution.
482
type hodlHtlc struct {
483
        add        lnwire.UpdateAddHTLC
484
        sourceRef  channeldb.AddRef
485
        obfuscator hop.ErrorEncrypter
486
}
487

488
// NewChannelLink creates a new instance of a ChannelLink given a configuration
489
// and active channel that will be used to verify/apply updates to.
490
func NewChannelLink(cfg ChannelLinkConfig,
491
        channel *lnwallet.LightningChannel) ChannelLink {
218✔
492

218✔
493
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
218✔
494

218✔
495
        // If the max fee exposure isn't set, use the default.
218✔
496
        if cfg.MaxFeeExposure == 0 {
433✔
497
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
215✔
498
        }
215✔
499

500
        var qsm Quiescer
218✔
501
        if !cfg.DisallowQuiescence {
436✔
502
                qsm = NewQuiescer(QuiescerCfg{
218✔
503
                        chanID: lnwire.NewChanIDFromOutPoint(
218✔
504
                                channel.ChannelPoint(),
218✔
505
                        ),
218✔
506
                        channelInitiator: channel.Initiator(),
218✔
507
                        sendMsg: func(s lnwire.Stfu) error {
223✔
508
                                return cfg.Peer.SendMessage(false, &s)
5✔
509
                        },
5✔
510
                        timeoutDuration: cfg.QuiescenceTimeout,
511
                        onTimeout: func() {
5✔
512
                                cfg.Peer.Disconnect(ErrQuiescenceTimeout)
5✔
513
                        },
5✔
514
                })
515
        } else {
×
516
                qsm = &quiescerNoop{}
×
517
        }
×
518

519
        quiescenceReqs := make(
218✔
520
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
218✔
521
        )
218✔
522

218✔
523
        return &channelLink{
218✔
524
                cfg:                 cfg,
218✔
525
                channel:             channel,
218✔
526
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
218✔
527
                hodlQueue:           queue.NewConcurrentQueue(10),
218✔
528
                log:                 log.WithPrefix(logPrefix),
218✔
529
                flushHooks:          newHookMap(),
218✔
530
                outgoingCommitHooks: newHookMap(),
218✔
531
                incomingCommitHooks: newHookMap(),
218✔
532
                quiescer:            qsm,
218✔
533
                quiescenceReqs:      quiescenceReqs,
218✔
534
                cg:                  fn.NewContextGuard(),
218✔
535
        }
218✔
536
}
537

538
// A compile time check to ensure channelLink implements the ChannelLink
539
// interface.
540
var _ ChannelLink = (*channelLink)(nil)
541

542
// Start starts all helper goroutines required for the operation of the channel
543
// link.
544
//
545
// NOTE: Part of the ChannelLink interface.
546
func (l *channelLink) Start() error {
216✔
547
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
216✔
548
                err := fmt.Errorf("channel link(%v): already started", l)
×
549
                l.log.Warn("already started")
×
550
                return err
×
551
        }
×
552

553
        l.log.Info("starting")
216✔
554

216✔
555
        // If the config supplied watchtower client, ensure the channel is
216✔
556
        // registered before trying to use it during operation.
216✔
557
        if l.cfg.TowerClient != nil {
219✔
558
                err := l.cfg.TowerClient.RegisterChannel(
3✔
559
                        l.ChanID(), l.channel.State().ChanType,
3✔
560
                )
3✔
561
                if err != nil {
3✔
562
                        return err
×
563
                }
×
564
        }
565

566
        l.mailBox.ResetMessages()
216✔
567
        l.hodlQueue.Start()
216✔
568

216✔
569
        // Before launching the htlcManager messages, revert any circuits that
216✔
570
        // were marked open in the switch's circuit map, but did not make it
216✔
571
        // into a commitment txn. We use the next local htlc index as the cut
216✔
572
        // off point, since all indexes below that are committed. This action
216✔
573
        // is only performed if the link's final short channel ID has been
216✔
574
        // assigned, otherwise we would try to trim the htlcs belonging to the
216✔
575
        // all-zero, hop.Source ID.
216✔
576
        if l.ShortChanID() != hop.Source {
432✔
577
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
216✔
578
                if err != nil {
216✔
579
                        return fmt.Errorf("unable to retrieve next local "+
×
580
                                "htlc index: %v", err)
×
581
                }
×
582

583
                // NOTE: This is automatically done by the switch when it
584
                // starts up, but is necessary to prevent inconsistencies in
585
                // the case that the link flaps. This is a result of a link's
586
                // life-cycle being shorter than that of the switch.
587
                chanID := l.ShortChanID()
216✔
588
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
216✔
589
                if err != nil {
216✔
590
                        return fmt.Errorf("unable to trim circuits above "+
×
591
                                "local htlc index %d: %v", localHtlcIndex, err)
×
592
                }
×
593

594
                // Since the link is live, before we start the link we'll update
595
                // the ChainArbitrator with the set of new channel signals for
596
                // this channel.
597
                //
598
                // TODO(roasbeef): split goroutines within channel arb to avoid
599
                go func() {
432✔
600
                        signals := &contractcourt.ContractSignals{
216✔
601
                                ShortChanID: l.channel.ShortChanID(),
216✔
602
                        }
216✔
603

216✔
604
                        err := l.cfg.UpdateContractSignals(signals)
216✔
605
                        if err != nil {
216✔
606
                                l.log.Errorf("unable to update signals")
×
607
                        }
×
608
                }()
609
        }
610

611
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
216✔
612

216✔
613
        l.cg.WgAdd(1)
216✔
614
        go l.htlcManager(context.TODO())
216✔
615

216✔
616
        return nil
216✔
617
}
618

619
// Stop gracefully stops all active helper goroutines, then waits until they've
620
// exited.
621
//
622
// NOTE: Part of the ChannelLink interface.
623
func (l *channelLink) Stop() {
217✔
624
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
229✔
625
                l.log.Warn("already stopped")
12✔
626
                return
12✔
627
        }
12✔
628

629
        l.log.Info("stopping")
205✔
630

205✔
631
        // As the link is stopping, we are no longer interested in htlc
205✔
632
        // resolutions coming from the invoice registry.
205✔
633
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
205✔
634

205✔
635
        if l.cfg.ChainEvents.Cancel != nil {
208✔
636
                l.cfg.ChainEvents.Cancel()
3✔
637
        }
3✔
638

639
        // Ensure the channel for the timer is drained.
640
        if l.updateFeeTimer != nil {
410✔
641
                if !l.updateFeeTimer.Stop() {
205✔
642
                        select {
×
643
                        case <-l.updateFeeTimer.C:
×
644
                        default:
×
645
                        }
646
                }
647
        }
648

649
        if l.hodlQueue != nil {
410✔
650
                l.hodlQueue.Stop()
205✔
651
        }
205✔
652

653
        l.cg.Quit()
205✔
654
        l.cg.WgWait()
205✔
655

205✔
656
        // Now that the htlcManager has completely exited, reset the packet
205✔
657
        // courier. This allows the mailbox to revaluate any lingering Adds that
205✔
658
        // were delivered but didn't make it on a commitment to be failed back
205✔
659
        // if the link is offline for an extended period of time. The error is
205✔
660
        // ignored since it can only fail when the daemon is exiting.
205✔
661
        _ = l.mailBox.ResetPackets()
205✔
662

205✔
663
        // As a final precaution, we will attempt to flush any uncommitted
205✔
664
        // preimages to the preimage cache. The preimages should be re-delivered
205✔
665
        // after channel reestablishment, however this adds an extra layer of
205✔
666
        // protection in case the peer never returns. Without this, we will be
205✔
667
        // unable to settle any contracts depending on the preimages even though
205✔
668
        // we had learned them at some point.
205✔
669
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
205✔
670
        if err != nil {
205✔
671
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
672
                        l.uncommittedPreimages, err)
×
673
        }
×
674
}
675

676
// WaitForShutdown blocks until the link finishes shutting down, which includes
677
// termination of all dependent goroutines.
678
func (l *channelLink) WaitForShutdown() {
×
679
        l.cg.WgWait()
×
680
}
×
681

682
// EligibleToForward returns a bool indicating if the channel is able to
683
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
684
// we are eligible to update AND the channel isn't currently flushing the
685
// outgoing half of the channel.
686
//
687
// NOTE: MUST NOT be called from the main event loop.
688
func (l *channelLink) EligibleToForward() bool {
611✔
689
        l.RLock()
611✔
690
        defer l.RUnlock()
611✔
691

611✔
692
        return l.eligibleToForward()
611✔
693
}
611✔
694

695
// eligibleToForward returns a bool indicating if the channel is able to
696
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
697
// we are eligible to update AND the channel isn't currently flushing the
698
// outgoing half of the channel.
699
//
700
// NOTE: MUST be called from the main event loop.
701
func (l *channelLink) eligibleToForward() bool {
611✔
702
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
611✔
703
}
611✔
704

705
// eligibleToUpdate returns a bool indicating if the channel is able to update
706
// channel state. We're able to update channel state if we know the remote
707
// party's next revocation point. Otherwise, we can't initiate new channel
708
// state. We also require that the short channel ID not be the all-zero source
709
// ID, meaning that the channel has had its ID finalized.
710
//
711
// NOTE: MUST be called from the main event loop.
712
func (l *channelLink) eligibleToUpdate() bool {
614✔
713
        return l.channel.RemoteNextRevocation() != nil &&
614✔
714
                l.channel.ShortChanID() != hop.Source &&
614✔
715
                l.isReestablished() &&
614✔
716
                l.quiescer.CanSendUpdates()
614✔
717
}
614✔
718

719
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
720
// the specified direction. It returns true if the state was changed and false
721
// if the desired state was already set before the method was called.
722
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
17✔
723
        if linkDirection == Outgoing {
26✔
724
                return l.isOutgoingAddBlocked.Swap(false)
9✔
725
        }
9✔
726

727
        return l.isIncomingAddBlocked.Swap(false)
8✔
728
}
729

730
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
731
// the specified direction. It returns true if the state was changed and false
732
// if the desired state was already set before the method was called.
733
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
15✔
734
        if linkDirection == Outgoing {
24✔
735
                return !l.isOutgoingAddBlocked.Swap(true)
9✔
736
        }
9✔
737

738
        return !l.isIncomingAddBlocked.Swap(true)
9✔
739
}
740

741
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
742
// the argument.
743
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
1,581✔
744
        if linkDirection == Outgoing {
2,690✔
745
                return l.isOutgoingAddBlocked.Load()
1,109✔
746
        }
1,109✔
747

748
        return l.isIncomingAddBlocked.Load()
475✔
749
}
750

751
// OnFlushedOnce adds a hook that will be called the next time the channel
752
// state reaches zero htlcs. This hook will only ever be called once. If the
753
// channel state already has zero htlcs, then this will be called immediately.
754
func (l *channelLink) OnFlushedOnce(hook func()) {
4✔
755
        select {
4✔
756
        case l.flushHooks.newTransients <- hook:
4✔
757
        case <-l.cg.Done():
×
758
        }
759
}
760

761
// OnCommitOnce adds a hook that will be called the next time a CommitSig
762
// message is sent in the argument's LinkDirection. This hook will only ever be
763
// called once. If no CommitSig is owed in the argument's LinkDirection, then
764
// we will call this hook be run immediately.
765
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
4✔
766
        var queue chan func()
4✔
767

4✔
768
        if direction == Outgoing {
8✔
769
                queue = l.outgoingCommitHooks.newTransients
4✔
770
        } else {
4✔
771
                queue = l.incomingCommitHooks.newTransients
×
772
        }
×
773

774
        select {
4✔
775
        case queue <- hook:
4✔
776
        case <-l.cg.Done():
×
777
        }
778
}
779

780
// InitStfu allows us to initiate quiescence on this link. It returns a receive
781
// only channel that will block until quiescence has been achieved, or
782
// definitively fails.
783
//
784
// This operation has been added to allow channels to be quiesced via RPC. It
785
// may be removed or reworked in the future as RPC initiated quiescence is a
786
// holdover until we have downstream protocols that use it.
787
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
4✔
788
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
4✔
789
                fn.Unit{},
4✔
790
        )
4✔
791

4✔
792
        select {
4✔
793
        case l.quiescenceReqs <- req:
4✔
794
        case <-l.cg.Done():
×
795
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
796
        }
797

798
        return out
4✔
799
}
800

801
// isReestablished returns true if the link has successfully completed the
802
// channel reestablishment dance.
803
func (l *channelLink) isReestablished() bool {
614✔
804
        return atomic.LoadInt32(&l.reestablished) == 1
614✔
805
}
614✔
806

807
// markReestablished signals that the remote peer has successfully exchanged
808
// channel reestablish messages and that the channel is ready to process
809
// subsequent messages.
810
func (l *channelLink) markReestablished() {
216✔
811
        atomic.StoreInt32(&l.reestablished, 1)
216✔
812
}
216✔
813

814
// IsUnadvertised returns true if the underlying channel is unadvertised.
815
func (l *channelLink) IsUnadvertised() bool {
5✔
816
        state := l.channel.State()
5✔
817
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
5✔
818
}
5✔
819

820
// sampleNetworkFee samples the current fee rate on the network to get into the
821
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
822
// this is the native rate used when computing the fee for commitment
823
// transactions, and the second-level HTLC transactions.
824
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
4✔
825
        // We'll first query for the sat/kw recommended to be confirmed within 3
4✔
826
        // blocks.
4✔
827
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
4✔
828
        if err != nil {
4✔
829
                return 0, err
×
830
        }
×
831

832
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
4✔
833
                int64(feePerKw))
4✔
834

4✔
835
        return feePerKw, nil
4✔
836
}
837

838
// shouldAdjustCommitFee returns true if we should update our commitment fee to
839
// match that of the network fee. We'll only update our commitment fee if the
840
// network fee is +/- 10% to our commitment fee or if our current commitment
841
// fee is below the minimum relay fee.
842
func shouldAdjustCommitFee(netFee, chanFee,
843
        minRelayFee chainfee.SatPerKWeight) bool {
14✔
844

14✔
845
        switch {
14✔
846
        // If the network fee is greater than our current commitment fee and
847
        // our current commitment fee is below the minimum relay fee then
848
        // we should switch to it no matter if it is less than a 10% increase.
849
        case netFee > chanFee && chanFee < minRelayFee:
1✔
850
                return true
1✔
851

852
        // If the network fee is greater than the commitment fee, then we'll
853
        // switch to it if it's at least 10% greater than the commit fee.
854
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
4✔
855
                return true
4✔
856

857
        // If the network fee is less than our commitment fee, then we'll
858
        // switch to it if it's at least 10% less than the commitment fee.
859
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
2✔
860
                return true
2✔
861

862
        // Otherwise, we won't modify our fee.
863
        default:
7✔
864
                return false
7✔
865
        }
866
}
867

868
// failCb is used to cut down on the argument verbosity.
869
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
870

871
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
872
// outgoing HTLC. It may return a FailureMessage that references a channel's
873
// alias. If the channel does not have an alias, then the regular channel
874
// update from disk will be returned.
875
func (l *channelLink) createFailureWithUpdate(incoming bool,
876
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
25✔
877

25✔
878
        // Determine which SCID to use in case we need to use aliases in the
25✔
879
        // ChannelUpdate.
25✔
880
        scid := outgoingScid
25✔
881
        if incoming {
25✔
882
                scid = l.ShortChanID()
×
883
        }
×
884

885
        // Try using the FailAliasUpdate function. If it returns nil, fallback
886
        // to the non-alias behavior.
887
        update := l.cfg.FailAliasUpdate(scid, incoming)
25✔
888
        if update == nil {
44✔
889
                // Fallback to the non-alias behavior.
19✔
890
                var err error
19✔
891
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
19✔
892
                if err != nil {
19✔
893
                        return &lnwire.FailTemporaryNodeFailure{}
×
894
                }
×
895
        }
896

897
        return cb(update)
25✔
898
}
899

900
// syncChanState attempts to synchronize channel states with the remote party.
901
// This method is to be called upon reconnection after the initial funding
902
// flow. We'll compare out commitment chains with the remote party, and re-send
903
// either a danging commit signature, a revocation, or both.
904
func (l *channelLink) syncChanStates(ctx context.Context) error {
173✔
905
        chanState := l.channel.State()
173✔
906

173✔
907
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
173✔
908

173✔
909
        // First, we'll generate our ChanSync message to send to the other
173✔
910
        // side. Based on this message, the remote party will decide if they
173✔
911
        // need to retransmit any data or not.
173✔
912
        localChanSyncMsg, err := chanState.ChanSyncMsg()
173✔
913
        if err != nil {
173✔
914
                return fmt.Errorf("unable to generate chan sync message for "+
×
915
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
916
        }
×
917
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
173✔
918
                return fmt.Errorf("unable to send chan sync message for "+
×
919
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
920
        }
×
921

922
        var msgsToReSend []lnwire.Message
173✔
923

173✔
924
        // Next, we'll wait indefinitely to receive the ChanSync message. The
173✔
925
        // first message sent MUST be the ChanSync message.
173✔
926
        select {
173✔
927
        case msg := <-l.upstream:
173✔
928
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
173✔
929
                        l.cfg.Peer.PubKey())
173✔
930

173✔
931
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
173✔
932
                if !ok {
173✔
933
                        return fmt.Errorf("first message sent to sync "+
×
934
                                "should be ChannelReestablish, instead "+
×
935
                                "received: %T", msg)
×
936
                }
×
937

938
                // If the remote party indicates that they think we haven't
939
                // done any state updates yet, then we'll retransmit the
940
                // channel_ready message first. We do this, as at this point
941
                // we can't be sure if they've really received the
942
                // ChannelReady message.
943
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
173✔
944
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
173✔
945
                        !l.channel.IsPending() {
340✔
946

167✔
947
                        l.log.Infof("resending ChannelReady message to peer")
167✔
948

167✔
949
                        nextRevocation, err := l.channel.NextRevocationKey()
167✔
950
                        if err != nil {
167✔
951
                                return fmt.Errorf("unable to create next "+
×
952
                                        "revocation: %v", err)
×
953
                        }
×
954

955
                        channelReadyMsg := lnwire.NewChannelReady(
167✔
956
                                l.ChanID(), nextRevocation,
167✔
957
                        )
167✔
958

167✔
959
                        // If this is a taproot channel, then we'll send the
167✔
960
                        // very same nonce that we sent above, as they should
167✔
961
                        // take the latest verification nonce we send.
167✔
962
                        if chanState.ChanType.IsTaproot() {
170✔
963
                                //nolint:ll
3✔
964
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
3✔
965
                        }
3✔
966

967
                        // For channels that negotiated the option-scid-alias
968
                        // feature bit, ensure that we send over the alias in
969
                        // the channel_ready message. We'll send the first
970
                        // alias we find for the channel since it does not
971
                        // matter which alias we send. We'll error out if no
972
                        // aliases are found.
973
                        if l.negotiatedAliasFeature() {
170✔
974
                                aliases := l.getAliases()
3✔
975
                                if len(aliases) == 0 {
3✔
976
                                        // This shouldn't happen since we
×
977
                                        // always add at least one alias before
×
978
                                        // the channel reaches the link.
×
979
                                        return fmt.Errorf("no aliases found")
×
980
                                }
×
981

982
                                // getAliases returns a copy of the alias slice
983
                                // so it is ok to use a pointer to the first
984
                                // entry.
985
                                channelReadyMsg.AliasScid = &aliases[0]
3✔
986
                        }
987

988
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
167✔
989
                        if err != nil {
167✔
990
                                return fmt.Errorf("unable to re-send "+
×
991
                                        "ChannelReady: %v", err)
×
992
                        }
×
993
                }
994

995
                // In any case, we'll then process their ChanSync message.
996
                l.log.Info("received re-establishment message from remote side")
173✔
997

173✔
998
                var (
173✔
999
                        openedCircuits []CircuitKey
173✔
1000
                        closedCircuits []CircuitKey
173✔
1001
                )
173✔
1002

173✔
1003
                // We've just received a ChanSync message from the remote
173✔
1004
                // party, so we'll process the message  in order to determine
173✔
1005
                // if we need to re-transmit any messages to the remote party.
173✔
1006
                ctx, cancel := l.cg.Create(ctx)
173✔
1007
                defer cancel()
173✔
1008
                msgsToReSend, openedCircuits, closedCircuits, err =
173✔
1009
                        l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
173✔
1010
                if err != nil {
176✔
1011
                        return err
3✔
1012
                }
3✔
1013

1014
                // Repopulate any identifiers for circuits that may have been
1015
                // opened or unclosed. This may happen if we needed to
1016
                // retransmit a commitment signature message.
1017
                l.openedCircuits = openedCircuits
173✔
1018
                l.closedCircuits = closedCircuits
173✔
1019

173✔
1020
                // Ensure that all packets have been have been removed from the
173✔
1021
                // link's mailbox.
173✔
1022
                if err := l.ackDownStreamPackets(); err != nil {
173✔
1023
                        return err
×
1024
                }
×
1025

1026
                if len(msgsToReSend) > 0 {
178✔
1027
                        l.log.Infof("sending %v updates to synchronize the "+
5✔
1028
                                "state", len(msgsToReSend))
5✔
1029
                }
5✔
1030

1031
                // If we have any messages to retransmit, we'll do so
1032
                // immediately so we return to a synchronized state as soon as
1033
                // possible.
1034
                for _, msg := range msgsToReSend {
184✔
1035
                        err := l.cfg.Peer.SendMessage(false, msg)
11✔
1036
                        if err != nil {
11✔
1037
                                l.log.Errorf("failed to send %v: %v",
×
1038
                                        msg.MsgType(), err)
×
1039
                        }
×
1040
                }
1041

1042
        case <-l.cg.Done():
3✔
1043
                return ErrLinkShuttingDown
3✔
1044
        }
1045

1046
        return nil
173✔
1047
}
1048

1049
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1050
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1051
// we previously received are reinstated in memory, and forwarded to the switch
1052
// if necessary. After a restart, this will also delete any previously
1053
// completed packages.
1054
func (l *channelLink) resolveFwdPkgs(ctx context.Context) error {
216✔
1055
        fwdPkgs, err := l.channel.LoadFwdPkgs()
216✔
1056
        if err != nil {
216✔
UNCOV
1057
                return err
×
UNCOV
1058
        }
×
1059

1060
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
216✔
1061

216✔
1062
        for _, fwdPkg := range fwdPkgs {
225✔
1063
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
9✔
UNCOV
1064
                        return err
×
UNCOV
1065
                }
×
1066
        }
1067

1068
        // If any of our reprocessing steps require an update to the commitment
1069
        // txn, we initiate a state transition to capture all relevant changes.
1070
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
219✔
1071
                return l.updateCommitTx(ctx)
3✔
1072
        }
3✔
1073

1074
        return nil
216✔
1075
}
1076

1077
// resolveFwdPkg interprets the FwdState of the provided package, either
1078
// reprocesses any outstanding htlcs in the package, or performs garbage
1079
// collection on the package.
1080
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
9✔
1081
        // Remove any completed packages to clear up space.
9✔
1082
        if fwdPkg.State == channeldb.FwdStateCompleted {
13✔
1083
                l.log.Debugf("removing completed fwd pkg for height=%d",
4✔
1084
                        fwdPkg.Height)
4✔
1085

4✔
1086
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
4✔
1087
                if err != nil {
4✔
UNCOV
1088
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
UNCOV
1089
                                "%v", fwdPkg.Height, err)
×
UNCOV
1090
                        return err
×
UNCOV
1091
                }
×
1092
        }
1093

1094
        // Otherwise this is either a new package or one has gone through
1095
        // processing, but contains htlcs that need to be restored in memory.
1096
        // We replay this forwarding package to make sure our local mem state
1097
        // is resurrected, we mimic any original responses back to the remote
1098
        // party, and re-forward the relevant HTLCs to the switch.
1099

1100
        // If the package is fully acked but not completed, it must still have
1101
        // settles and fails to propagate.
1102
        if !fwdPkg.SettleFailFilter.IsFull() {
12✔
1103
                l.processRemoteSettleFails(fwdPkg)
3✔
1104
        }
3✔
1105

1106
        // Finally, replay *ALL ADDS* in this forwarding package. The
1107
        // downstream logic is able to filter out any duplicates, but we must
1108
        // shove the entire, original set of adds down the pipeline so that the
1109
        // batch of adds presented to the sphinx router does not ever change.
1110
        if !fwdPkg.AckFilter.IsFull() {
15✔
1111
                l.processRemoteAdds(fwdPkg)
6✔
1112

6✔
1113
                // If the link failed during processing the adds, we must
6✔
1114
                // return to ensure we won't attempted to update the state
6✔
1115
                // further.
6✔
1116
                if l.failed {
6✔
1117
                        return fmt.Errorf("link failed while " +
×
1118
                                "processing remote adds")
×
1119
                }
×
1120
        }
1121

1122
        return nil
9✔
1123
}
1124

1125
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1126
// removes those that can be discarded. It is safe to do this entirely in the
1127
// background, since all state is coordinated on disk. This also ensures the
1128
// link can continue to process messages and interleave database accesses.
1129
//
1130
// NOTE: This MUST be run as a goroutine.
1131
func (l *channelLink) fwdPkgGarbager() {
216✔
1132
        defer l.cg.WgDone()
216✔
1133

216✔
1134
        l.cfg.FwdPkgGCTicker.Resume()
216✔
1135
        defer l.cfg.FwdPkgGCTicker.Stop()
216✔
1136

216✔
1137
        if err := l.loadAndRemove(); err != nil {
217✔
1138
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
1✔
1139
        }
1✔
1140

1141
        for {
450✔
1142
                select {
234✔
1143
                case <-l.cfg.FwdPkgGCTicker.Ticks():
18✔
1144
                        if err := l.loadAndRemove(); err != nil {
36✔
1145
                                l.log.Warnf("unable to remove fwd pkgs: %v",
18✔
1146
                                        err)
18✔
1147
                                continue
18✔
1148
                        }
1149
                case <-l.cg.Done():
205✔
1150
                        return
205✔
1151
                }
1152
        }
1153
}
1154

1155
// loadAndRemove loads all the channels forwarding packages and determines if
1156
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1157
// a longer tick interval can be used.
1158
func (l *channelLink) loadAndRemove() error {
234✔
1159
        fwdPkgs, err := l.channel.LoadFwdPkgs()
234✔
1160
        if err != nil {
253✔
1161
                return err
19✔
1162
        }
19✔
1163

1164
        var removeHeights []uint64
215✔
1165
        for _, fwdPkg := range fwdPkgs {
223✔
1166
                if fwdPkg.State != channeldb.FwdStateCompleted {
16✔
1167
                        continue
8✔
1168
                }
1169

1170
                removeHeights = append(removeHeights, fwdPkg.Height)
3✔
1171
        }
1172

1173
        // If removeHeights is empty, return early so we don't use a db
1174
        // transaction.
1175
        if len(removeHeights) == 0 {
430✔
1176
                return nil
215✔
1177
        }
215✔
1178

1179
        return l.channel.RemoveFwdPkgs(removeHeights...)
3✔
1180
}
1181

1182
// handleChanSyncErr performs the error handling logic in the case where we
1183
// could not successfully syncChanStates with our channel peer.
1184
func (l *channelLink) handleChanSyncErr(err error) {
3✔
1185
        l.log.Warnf("error when syncing channel states: %v", err)
3✔
1186

3✔
1187
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
3✔
1188

3✔
1189
        switch {
3✔
1190
        case errors.Is(err, ErrLinkShuttingDown):
3✔
1191
                l.log.Debugf("unable to sync channel states, link is " +
3✔
1192
                        "shutting down")
3✔
1193
                return
3✔
1194

1195
        // We failed syncing the commit chains, probably because the remote has
1196
        // lost state. We should force close the channel.
1197
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
3✔
1198
                fallthrough
3✔
1199

1200
        // The remote sent us an invalid last commit secret, we should force
1201
        // close the channel.
1202
        // TODO(halseth): and permanently ban the peer?
1203
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
3✔
1204
                fallthrough
3✔
1205

1206
        // The remote sent us a commit point different from what they sent us
1207
        // before.
1208
        // TODO(halseth): ban peer?
1209
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
3✔
1210
                // We'll fail the link and tell the peer to force close the
3✔
1211
                // channel. Note that the database state is not updated here,
3✔
1212
                // but will be updated when the close transaction is ready to
3✔
1213
                // avoid that we go down before storing the transaction in the
3✔
1214
                // db.
3✔
1215
                l.failf(
3✔
1216
                        LinkFailureError{
3✔
1217
                                code:          ErrSyncError,
3✔
1218
                                FailureAction: LinkFailureForceClose,
3✔
1219
                        },
3✔
1220
                        "unable to synchronize channel states: %v", err,
3✔
1221
                )
3✔
1222

1223
        // We have lost state and cannot safely force close the channel. Fail
1224
        // the channel and wait for the remote to hopefully force close it. The
1225
        // remote has sent us its latest unrevoked commitment point, and we'll
1226
        // store it in the database, such that we can attempt to recover the
1227
        // funds if the remote force closes the channel.
1228
        case errors.As(err, &errDataLoss):
3✔
1229
                err := l.channel.MarkDataLoss(
3✔
1230
                        errDataLoss.CommitPoint,
3✔
1231
                )
3✔
1232
                if err != nil {
3✔
1233
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1234
                                err)
×
1235
                }
×
1236

1237
        // We determined the commit chains were not possible to sync. We
1238
        // cautiously fail the channel, but don't force close.
1239
        // TODO(halseth): can we safely force close in any cases where this
1240
        // error is returned?
1241
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1242
                if err := l.channel.MarkBorked(); err != nil {
×
1243
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1244
                }
×
1245

1246
        // Other, unspecified error.
1247
        default:
×
1248
        }
1249

1250
        l.failf(
3✔
1251
                LinkFailureError{
3✔
1252
                        code:          ErrRecoveryError,
3✔
1253
                        FailureAction: LinkFailureForceNone,
3✔
1254
                },
3✔
1255
                "unable to synchronize channel states: %v", err,
3✔
1256
        )
3✔
1257
}
1258

1259
// htlcManager is the primary goroutine which drives a channel's commitment
1260
// update state-machine in response to messages received via several channels.
1261
// This goroutine reads messages from the upstream (remote) peer, and also from
1262
// downstream channel managed by the channel link. In the event that an htlc
1263
// needs to be forwarded, then send-only forward handler is used which sends
1264
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1265
// all timeouts for any active HTLCs, manages the channel's revocation window,
1266
// and also the htlc trickle queue+timer for this active channels.
1267
//
1268
// NOTE: This MUST be run as a goroutine.
1269
func (l *channelLink) htlcManager(ctx context.Context) {
216✔
1270
        defer func() {
422✔
1271
                l.cfg.BatchTicker.Stop()
206✔
1272
                l.cg.WgDone()
206✔
1273
                l.log.Infof("exited")
206✔
1274
        }()
206✔
1275

1276
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
216✔
1277

216✔
1278
        // Notify any clients that the link is now in the switch via an
216✔
1279
        // ActiveLinkEvent. We'll also defer an inactive link notification for
216✔
1280
        // when the link exits to ensure that every active notification is
216✔
1281
        // matched by an inactive one.
216✔
1282
        l.cfg.NotifyActiveLink(l.ChannelPoint())
216✔
1283
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
216✔
1284

216✔
1285
        // If the link is not started for the first time, we need to take extra
216✔
1286
        // steps to resume its state.
216✔
1287
        err := l.resumeLink(ctx)
216✔
1288
        if err != nil {
219✔
1289
                l.log.Errorf("resuming link failed: %v", err)
3✔
1290
                return
3✔
1291
        }
3✔
1292

1293
        // Now that we've received both channel_ready and channel reestablish,
1294
        // we can go ahead and send the active channel notification. We'll also
1295
        // defer the inactive notification for when the link exits to ensure
1296
        // that every active notification is matched by an inactive one.
1297
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
216✔
1298
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
216✔
1299

216✔
1300
        for {
4,331✔
1301
                // We must always check if we failed at some point processing
4,115✔
1302
                // the last update before processing the next.
4,115✔
1303
                if l.failed {
4,132✔
1304
                        l.log.Errorf("link failed, exiting htlcManager")
17✔
1305
                        return
17✔
1306
                }
17✔
1307

1308
                // Pause or resume the batch ticker.
1309
                l.toggleBatchTicker()
4,101✔
1310

4,101✔
1311
                select {
4,101✔
1312
                // We have a new hook that needs to be run when we reach a clean
1313
                // channel state.
1314
                case hook := <-l.flushHooks.newTransients:
4✔
1315
                        if l.channel.IsChannelClean() {
7✔
1316
                                hook()
3✔
1317
                        } else {
7✔
1318
                                l.flushHooks.alloc(hook)
4✔
1319
                        }
4✔
1320

1321
                // We have a new hook that needs to be run when we have
1322
                // committed all of our updates.
1323
                case hook := <-l.outgoingCommitHooks.newTransients:
4✔
1324
                        if !l.channel.OweCommitment() {
7✔
1325
                                hook()
3✔
1326
                        } else {
4✔
1327
                                l.outgoingCommitHooks.alloc(hook)
1✔
1328
                        }
1✔
1329

1330
                // We have a new hook that needs to be run when our peer has
1331
                // committed all of their updates.
1332
                case hook := <-l.incomingCommitHooks.newTransients:
×
1333
                        if !l.channel.NeedCommitment() {
×
1334
                                hook()
×
1335
                        } else {
×
1336
                                l.incomingCommitHooks.alloc(hook)
×
1337
                        }
×
1338

1339
                // Our update fee timer has fired, so we'll check the network
1340
                // fee to see if we should adjust our commitment fee.
1341
                case <-l.updateFeeTimer.C:
4✔
1342
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
4✔
1343
                        err := l.handleUpdateFee(ctx)
4✔
1344
                        if err != nil {
4✔
1345
                                l.log.Errorf("failed to handle update fee: "+
×
1346
                                        "%v", err)
×
1347
                        }
×
1348

1349
                // The underlying channel has notified us of a unilateral close
1350
                // carried out by the remote peer. In the case of such an
1351
                // event, we'll wipe the channel state from the peer, and mark
1352
                // the contract as fully settled. Afterwards we can exit.
1353
                //
1354
                // TODO(roasbeef): add force closure? also breach?
1355
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
3✔
1356
                        l.log.Warnf("remote peer has closed on-chain")
3✔
1357

3✔
1358
                        // TODO(roasbeef): remove all together
3✔
1359
                        go func() {
6✔
1360
                                chanPoint := l.channel.ChannelPoint()
3✔
1361
                                l.cfg.Peer.WipeChannel(&chanPoint)
3✔
1362
                        }()
3✔
1363

1364
                        return
3✔
1365

1366
                case <-l.cfg.BatchTicker.Ticks():
202✔
1367
                        // Attempt to extend the remote commitment chain
202✔
1368
                        // including all the currently pending entries. If the
202✔
1369
                        // send was unsuccessful, then abandon the update,
202✔
1370
                        // waiting for the revocation window to open up.
202✔
1371
                        if !l.updateCommitTxOrFail(ctx) {
202✔
1372
                                return
×
1373
                        }
×
1374

1375
                case <-l.cfg.PendingCommitTicker.Ticks():
1✔
1376
                        l.failf(
1✔
1377
                                LinkFailureError{
1✔
1378
                                        code:          ErrRemoteUnresponsive,
1✔
1379
                                        FailureAction: LinkFailureDisconnect,
1✔
1380
                                },
1✔
1381
                                "unable to complete dance",
1✔
1382
                        )
1✔
1383
                        return
1✔
1384

1385
                // A message from the switch was just received. This indicates
1386
                // that the link is an intermediate hop in a multi-hop HTLC
1387
                // circuit.
1388
                case pkt := <-l.downstream:
517✔
1389
                        l.handleDownstreamPkt(ctx, pkt)
517✔
1390

1391
                // A message from the connected peer was just received. This
1392
                // indicates that we have a new incoming HTLC, either directly
1393
                // for us, or part of a multi-hop HTLC circuit.
1394
                case msg := <-l.upstream:
3,127✔
1395
                        l.handleUpstreamMsg(ctx, msg)
3,127✔
1396

1397
                // A htlc resolution is received. This means that we now have a
1398
                // resolution for a previously accepted htlc.
1399
                case hodlItem := <-l.hodlQueue.ChanOut():
58✔
1400
                        err := l.handleHtlcResolution(ctx, hodlItem)
58✔
1401
                        if err != nil {
59✔
1402
                                l.log.Errorf("failed to handle htlc "+
1✔
1403
                                        "resolution: %v", err)
1✔
1404
                        }
1✔
1405

1406
                // A user-initiated quiescence request is received. We now
1407
                // forward it to the quiescer.
1408
                case qReq := <-l.quiescenceReqs:
4✔
1409
                        err := l.handleQuiescenceReq(qReq)
4✔
1410
                        if err != nil {
4✔
1411
                                l.log.Errorf("failed handle quiescence "+
×
1412
                                        "req: %v", err)
×
1413
                        }
×
1414

1415
                case <-l.cg.Done():
191✔
1416
                        return
191✔
1417
                }
1418
        }
1419
}
1420

1421
// processHodlQueue processes a received htlc resolution and continues reading
1422
// from the hodl queue until no more resolutions remain. When this function
1423
// returns without an error, the commit tx should be updated.
1424
func (l *channelLink) processHodlQueue(ctx context.Context,
1425
        firstResolution invoices.HtlcResolution) error {
58✔
1426

58✔
1427
        // Try to read all waiting resolution messages, so that they can all be
58✔
1428
        // processed in a single commitment tx update.
58✔
1429
        htlcResolution := firstResolution
58✔
1430
loop:
58✔
1431
        for {
116✔
1432
                // Lookup all hodl htlcs that can be failed or settled with this event.
58✔
1433
                // The hodl htlc must be present in the map.
58✔
1434
                circuitKey := htlcResolution.CircuitKey()
58✔
1435
                hodlHtlc, ok := l.hodlMap[circuitKey]
58✔
1436
                if !ok {
58✔
1437
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1438
                }
×
1439

1440
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
58✔
1441
                        return err
×
1442
                }
×
1443

1444
                // Clean up hodl map.
1445
                delete(l.hodlMap, circuitKey)
58✔
1446

58✔
1447
                select {
58✔
1448
                case item := <-l.hodlQueue.ChanOut():
3✔
1449
                        htlcResolution = item.(invoices.HtlcResolution)
3✔
1450

1451
                // No need to process it if the link is broken.
1452
                case <-l.cg.Done():
×
1453
                        return ErrLinkShuttingDown
×
1454

1455
                default:
58✔
1456
                        break loop
58✔
1457
                }
1458
        }
1459

1460
        // Update the commitment tx.
1461
        if err := l.updateCommitTx(ctx); err != nil {
59✔
1462
                return err
1✔
1463
        }
1✔
1464

1465
        return nil
57✔
1466
}
1467

1468
// processHtlcResolution applies a received htlc resolution to the provided
1469
// htlc. When this function returns without an error, the commit tx should be
1470
// updated.
1471
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1472
        htlc hodlHtlc) error {
204✔
1473

204✔
1474
        circuitKey := resolution.CircuitKey()
204✔
1475

204✔
1476
        // Determine required action for the resolution based on the type of
204✔
1477
        // resolution we have received.
204✔
1478
        switch res := resolution.(type) {
204✔
1479
        // Settle htlcs that returned a settle resolution using the preimage
1480
        // in the resolution.
1481
        case *invoices.HtlcSettleResolution:
200✔
1482
                l.log.Debugf("received settle resolution for %v "+
200✔
1483
                        "with outcome: %v", circuitKey, res.Outcome)
200✔
1484

200✔
1485
                return l.settleHTLC(
200✔
1486
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
200✔
1487
                )
200✔
1488

1489
        // For htlc failures, we get the relevant failure message based
1490
        // on the failure resolution and then fail the htlc.
1491
        case *invoices.HtlcFailResolution:
7✔
1492
                l.log.Debugf("received cancel resolution for "+
7✔
1493
                        "%v with outcome: %v", circuitKey, res.Outcome)
7✔
1494

7✔
1495
                // Get the lnwire failure message based on the resolution
7✔
1496
                // result.
7✔
1497
                failure := getResolutionFailure(res, htlc.add.Amount)
7✔
1498

7✔
1499
                l.sendHTLCError(
7✔
1500
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
7✔
1501
                        true,
7✔
1502
                )
7✔
1503
                return nil
7✔
1504

1505
        // Fail if we do not get a settle of fail resolution, since we
1506
        // are only expecting to handle settles and fails.
1507
        default:
×
1508
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1509
                        resolution)
×
1510
        }
1511
}
1512

1513
// getResolutionFailure returns the wire message that a htlc resolution should
1514
// be failed with.
1515
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1516
        amount lnwire.MilliSatoshi) *LinkError {
7✔
1517

7✔
1518
        // If the resolution has been resolved as part of a MPP timeout,
7✔
1519
        // we need to fail the htlc with lnwire.FailMppTimeout.
7✔
1520
        if resolution.Outcome == invoices.ResultMppTimeout {
7✔
1521
                return NewDetailedLinkError(
×
1522
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1523
                )
×
1524
        }
×
1525

1526
        // If the htlc is not a MPP timeout, we fail it with
1527
        // FailIncorrectDetails. This error is sent for invoice payment
1528
        // failures such as underpayment/ expiry too soon and hodl invoices
1529
        // (which return FailIncorrectDetails to avoid leaking information).
1530
        incorrectDetails := lnwire.NewFailIncorrectDetails(
7✔
1531
                amount, uint32(resolution.AcceptHeight),
7✔
1532
        )
7✔
1533

7✔
1534
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
7✔
1535
}
1536

1537
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1538
// within the link's configuration that will be used to determine when the link
1539
// should propose an update to its commitment fee rate.
1540
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
220✔
1541
        lower := int64(l.cfg.MinUpdateTimeout)
220✔
1542
        upper := int64(l.cfg.MaxUpdateTimeout)
220✔
1543
        return time.Duration(prand.Int63n(upper-lower) + lower)
220✔
1544
}
220✔
1545

1546
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1547
// downstream HTLC Switch.
1548
func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context,
1549
        pkt *htlcPacket) error {
477✔
1550

477✔
1551
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
477✔
1552
        if !ok {
477✔
1553
                return errors.New("not an UpdateAddHTLC packet")
×
1554
        }
×
1555

1556
        // If we are flushing the link in the outgoing direction or we have
1557
        // already sent Stfu, then we can't add new htlcs to the link and we
1558
        // need to bounce it.
1559
        if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
477✔
1560
                l.mailBox.FailAdd(pkt)
×
1561

×
1562
                return NewDetailedLinkError(
×
1563
                        &lnwire.FailTemporaryChannelFailure{},
×
1564
                        OutgoingFailureLinkNotEligible,
×
1565
                )
×
1566
        }
×
1567

1568
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1569
        // arbitrary delays between the switch adding an ADD to the
1570
        // mailbox, and the HTLC being added to the commitment state.
1571
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
477✔
1572
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1573
                l.mailBox.AckPacket(pkt.inKey())
×
1574
                return nil
×
1575
        }
×
1576

1577
        // Check if we can add the HTLC here without exceededing the max fee
1578
        // exposure threshold.
1579
        if l.isOverexposedWithHtlc(htlc, false) {
477✔
UNCOV
1580
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
×
UNCOV
1581
                        "exposure exceeded")
×
UNCOV
1582

×
UNCOV
1583
                l.mailBox.FailAdd(pkt)
×
UNCOV
1584

×
UNCOV
1585
                return NewDetailedLinkError(
×
UNCOV
1586
                        lnwire.NewTemporaryChannelFailure(nil),
×
UNCOV
1587
                        OutgoingFailureDownstreamHtlcAdd,
×
UNCOV
1588
                )
×
UNCOV
1589
        }
×
1590

1591
        // A new payment has been initiated via the downstream channel,
1592
        // so we add the new HTLC to our local log, then update the
1593
        // commitment chains.
1594
        htlc.ChanID = l.ChanID()
477✔
1595
        openCircuitRef := pkt.inKey()
477✔
1596

477✔
1597
        // We enforce the fee buffer for the commitment transaction because
477✔
1598
        // we are in control of adding this htlc. Nothing has locked-in yet so
477✔
1599
        // we can securely enforce the fee buffer which is only relevant if we
477✔
1600
        // are the initiator of the channel.
477✔
1601
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
477✔
1602
        if err != nil {
481✔
1603
                // The HTLC was unable to be added to the state machine,
4✔
1604
                // as a result, we'll signal the switch to cancel the
4✔
1605
                // pending payment.
4✔
1606
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
4✔
1607
                        err)
4✔
1608

4✔
1609
                // Remove this packet from the link's mailbox, this
4✔
1610
                // prevents it from being reprocessed if the link
4✔
1611
                // restarts and resets it mailbox. If this response
4✔
1612
                // doesn't make it back to the originating link, it will
4✔
1613
                // be rejected upon attempting to reforward the Add to
4✔
1614
                // the switch, since the circuit was never fully opened,
4✔
1615
                // and the forwarding package shows it as
4✔
1616
                // unacknowledged.
4✔
1617
                l.mailBox.FailAdd(pkt)
4✔
1618

4✔
1619
                return NewDetailedLinkError(
4✔
1620
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1621
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1622
                )
4✔
1623
        }
4✔
1624

1625
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
476✔
1626
                "local_log_index=%v, pend_updates=%v",
476✔
1627
                htlc.PaymentHash[:], index,
476✔
1628
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
476✔
1629

476✔
1630
        pkt.outgoingChanID = l.ShortChanID()
476✔
1631
        pkt.outgoingHTLCID = index
476✔
1632
        htlc.ID = index
476✔
1633

476✔
1634
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
476✔
1635
                pkt.inKey(), pkt.outKey())
476✔
1636

476✔
1637
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
476✔
1638
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
476✔
1639

476✔
1640
        err = l.cfg.Peer.SendMessage(false, htlc)
476✔
1641
        if err != nil {
476✔
1642
                l.log.Errorf("failed to send UpdateAddHTLC: %v", err)
×
1643
        }
×
1644

1645
        // Send a forward event notification to htlcNotifier.
1646
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
476✔
1647
                newHtlcKey(pkt),
476✔
1648
                HtlcInfo{
476✔
1649
                        IncomingTimeLock: pkt.incomingTimeout,
476✔
1650
                        IncomingAmt:      pkt.incomingAmount,
476✔
1651
                        OutgoingTimeLock: htlc.Expiry,
476✔
1652
                        OutgoingAmt:      htlc.Amount,
476✔
1653
                },
476✔
1654
                getEventType(pkt),
476✔
1655
        )
476✔
1656

476✔
1657
        l.tryBatchUpdateCommitTx(ctx)
476✔
1658

476✔
1659
        return nil
476✔
1660
}
1661

1662
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1663
// Switch. Possible messages sent by the switch include requests to forward new
1664
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1665
// cleared HTLCs with the upstream peer.
1666
//
1667
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1668
func (l *channelLink) handleDownstreamPkt(ctx context.Context,
1669
        pkt *htlcPacket) {
517✔
1670

517✔
1671
        if pkt.htlc.MsgType().IsChannelUpdate() &&
517✔
1672
                !l.quiescer.CanSendUpdates() {
517✔
1673

×
1674
                l.log.Warnf("unable to process channel update. "+
×
1675
                        "ChannelID=%v is quiescent.", l.ChanID)
×
1676

×
1677
                return
×
1678
        }
×
1679

1680
        switch htlc := pkt.htlc.(type) {
517✔
1681
        case *lnwire.UpdateAddHTLC:
477✔
1682
                // Handle add message. The returned error can be ignored,
477✔
1683
                // because it is also sent through the mailbox.
477✔
1684
                _ = l.handleDownstreamUpdateAdd(ctx, pkt)
477✔
1685

1686
        case *lnwire.UpdateFulfillHTLC:
26✔
1687
                l.processLocalUpdateFulfillHTLC(ctx, pkt, htlc)
26✔
1688

1689
        case *lnwire.UpdateFailHTLC:
20✔
1690
                l.processLocalUpdateFailHTLC(ctx, pkt, htlc)
20✔
1691
        }
1692
}
1693

1694
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1695
// full.
1696
func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) {
476✔
1697
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
476✔
1698
        if pending < uint64(l.cfg.BatchSize) {
931✔
1699
                return
455✔
1700
        }
455✔
1701

1702
        l.updateCommitTxOrFail(ctx)
24✔
1703
}
1704

1705
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1706
// associated with this packet. If successful in doing so, it will also purge
1707
// the open circuit from the circuit map and remove the packet from the link's
1708
// mailbox.
1709
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
2✔
1710
        inKey := pkt.inKey()
2✔
1711

2✔
1712
        l.log.Debugf("cleaning up spurious response for incoming "+
2✔
1713
                "circuit-key=%v", inKey)
2✔
1714

2✔
1715
        // If the htlc packet doesn't have a source reference, it is unsafe to
2✔
1716
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
2✔
1717
        if pkt.sourceRef == nil {
3✔
1718
                l.log.Errorf("unable to cleanup response for incoming "+
1✔
1719
                        "circuit-key=%v, does not contain source reference",
1✔
1720
                        inKey)
1✔
1721
                return
1✔
1722
        }
1✔
1723

1724
        // If the source reference is present,  we will try to prevent this link
1725
        // from resending the packet to the switch. To do so, we ack the AddRef
1726
        // of the incoming HTLC belonging to this link.
1727
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1✔
1728
        if err != nil {
1✔
1729
                l.log.Errorf("unable to ack AddRef for incoming "+
×
1730
                        "circuit-key=%v: %v", inKey, err)
×
1731

×
1732
                // If this operation failed, it is unsafe to attempt removal of
×
1733
                // the destination reference or circuit, so we exit early. The
×
1734
                // cleanup may proceed with a different packet in the future
×
1735
                // that succeeds on this step.
×
1736
                return
×
1737
        }
×
1738

1739
        // Now that we know this link will stop retransmitting Adds to the
1740
        // switch, we can begin to teardown the response reference and circuit
1741
        // map.
1742
        //
1743
        // If the packet includes a destination reference, then a response for
1744
        // this HTLC was locked into the outgoing channel. Attempt to remove
1745
        // this reference, so we stop retransmitting the response internally.
1746
        // Even if this fails, we will proceed in trying to delete the circuit.
1747
        // When retransmitting responses, the destination references will be
1748
        // cleaned up if an open circuit is not found in the circuit map.
1749
        if pkt.destRef != nil {
1✔
1750
                err := l.channel.AckSettleFails(*pkt.destRef)
×
1751
                if err != nil {
×
1752
                        l.log.Errorf("unable to ack SettleFailRef "+
×
1753
                                "for incoming circuit-key=%v: %v",
×
1754
                                inKey, err)
×
1755
                }
×
1756
        }
1757

1758
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1✔
1759

1✔
1760
        // With all known references acked, we can now safely delete the circuit
1✔
1761
        // from the switch's circuit map, as the state is no longer needed.
1✔
1762
        err = l.cfg.Circuits.DeleteCircuits(inKey)
1✔
1763
        if err != nil {
1✔
1764
                l.log.Errorf("unable to delete circuit for "+
×
1765
                        "circuit-key=%v: %v", inKey, err)
×
1766
        }
×
1767
}
1768

1769
// handleUpstreamMsg processes wire messages related to commitment state
1770
// updates from the upstream peer. The upstream peer is the peer whom we have a
1771
// direct channel with, updating our respective commitment chains.
1772
func (l *channelLink) handleUpstreamMsg(ctx context.Context,
1773
        msg lnwire.Message) {
3,127✔
1774

3,127✔
1775
        l.log.Tracef("receive upstream msg %v, handling now... ", msg.MsgType())
3,127✔
1776
        defer l.log.Tracef("handled upstream msg %v", msg.MsgType())
3,127✔
1777

3,127✔
1778
        // First check if the message is an update and we are capable of
3,127✔
1779
        // receiving updates right now.
3,127✔
1780
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
3,127✔
1781
                l.stfuFailf("update received after stfu: %T", msg)
×
1782
                return
×
1783
        }
×
1784

1785
        var err error
3,127✔
1786

3,127✔
1787
        switch msg := msg.(type) {
3,127✔
1788
        case *lnwire.UpdateAddHTLC:
451✔
1789
                err = l.processRemoteUpdateAddHTLC(msg)
451✔
1790

1791
        case *lnwire.UpdateFulfillHTLC:
230✔
1792
                err = l.processRemoteUpdateFulfillHTLC(msg)
230✔
1793

1794
        case *lnwire.UpdateFailMalformedHTLC:
6✔
1795
                err = l.processRemoteUpdateFailMalformedHTLC(msg)
6✔
1796

1797
        case *lnwire.UpdateFailHTLC:
122✔
1798
                err = l.processRemoteUpdateFailHTLC(msg)
122✔
1799

1800
        case *lnwire.CommitSig:
1,169✔
1801
                err = l.processRemoteCommitSig(ctx, msg)
1,169✔
1802

1803
        case *lnwire.RevokeAndAck:
1,158✔
1804
                err = l.processRemoteRevokeAndAck(ctx, msg)
1,158✔
1805

1806
        case *lnwire.UpdateFee:
3✔
1807
                err = l.processRemoteUpdateFee(msg)
3✔
1808

1809
        case *lnwire.Stfu:
5✔
1810
                err = l.handleStfu(msg)
5✔
1811
                if err != nil {
5✔
1812
                        l.stfuFailf("handleStfu: %v", err.Error())
×
1813
                }
×
1814

1815
        // In the case where we receive a warning message from our peer, just
1816
        // log it and move on. We choose not to disconnect from our peer,
1817
        // although we "MAY" do so according to the specification.
1818
        case *lnwire.Warning:
1✔
1819
                l.log.Warnf("received warning message from peer: %v",
1✔
1820
                        msg.Warning())
1✔
1821

1822
        case *lnwire.Error:
2✔
1823
                l.processRemoteError(msg)
2✔
1824

1825
        default:
×
1826
                l.log.Warnf("received unknown message of type %T", msg)
×
1827
        }
1828

1829
        if err != nil {
3,132✔
1830
                l.log.Errorf("failed to process remote %v: %v", msg.MsgType(),
5✔
1831
                        err)
5✔
1832
        }
5✔
1833
}
1834

1835
// handleStfu implements the top-level logic for handling the Stfu message from
1836
// our peer.
1837
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
5✔
1838
        if !l.noDanglingUpdates(lntypes.Remote) {
5✔
1839
                return ErrPendingRemoteUpdates
×
1840
        }
×
1841
        err := l.quiescer.RecvStfu(*stfu)
5✔
1842
        if err != nil {
5✔
1843
                return err
×
1844
        }
×
1845

1846
        // If we can immediately send an Stfu response back, we will.
1847
        if l.noDanglingUpdates(lntypes.Local) {
9✔
1848
                return l.quiescer.SendOwedStfu()
4✔
1849
        }
4✔
1850

1851
        return nil
1✔
1852
}
1853

1854
// stfuFailf fails the link in the case where the requirements of the quiescence
1855
// protocol are violated. In all cases we opt to drop the connection as only
1856
// link state (as opposed to channel state) is affected.
1857
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
1858
        l.failf(LinkFailureError{
×
1859
                code:             ErrStfuViolation,
×
1860
                FailureAction:    LinkFailureDisconnect,
×
1861
                PermanentFailure: false,
×
1862
                Warning:          true,
×
1863
        }, format, args...)
×
1864
}
×
1865

1866
// noDanglingUpdates returns true when there are 0 updates that were originally
1867
// issued by whose on either the Local or Remote commitment transaction.
1868
func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
1,174✔
1869
        pendingOnLocal := l.channel.NumPendingUpdates(
1,174✔
1870
                whose, lntypes.Local,
1,174✔
1871
        )
1,174✔
1872
        pendingOnRemote := l.channel.NumPendingUpdates(
1,174✔
1873
                whose, lntypes.Remote,
1,174✔
1874
        )
1,174✔
1875

1,174✔
1876
        return pendingOnLocal == 0 && pendingOnRemote == 0
1,174✔
1877
}
1,174✔
1878

1879
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
1880
// for packets delivered from server, and cleaning up any circuits closed by
1881
// signing a previous commitment txn. This method ensures that the circuits are
1882
// removed from the circuit map before removing them from the link's mailbox,
1883
// otherwise it could be possible for some circuit to be missed if this link
1884
// flaps.
1885
func (l *channelLink) ackDownStreamPackets() error {
1,350✔
1886
        // First, remove the downstream Add packets that were included in the
1,350✔
1887
        // previous commitment signature. This will prevent the Adds from being
1,350✔
1888
        // replayed if this link disconnects.
1,350✔
1889
        for _, inKey := range l.openedCircuits {
1,815✔
1890
                // In order to test the sphinx replay logic of the remote
465✔
1891
                // party, unsafe replay does not acknowledge the packets from
465✔
1892
                // the mailbox. We can then force a replay of any Add packets
465✔
1893
                // held in memory by disconnecting and reconnecting the link.
465✔
1894
                if l.cfg.UnsafeReplay {
468✔
1895
                        continue
3✔
1896
                }
1897

1898
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
465✔
1899
                l.mailBox.AckPacket(inKey)
465✔
1900
        }
1901

1902
        // Now, we will delete all circuits closed by the previous commitment
1903
        // signature, which is the result of downstream Settle/Fail packets. We
1904
        // batch them here to ensure circuits are closed atomically and for
1905
        // performance.
1906
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
1,350✔
1907
        switch err {
1,350✔
1908
        case nil:
1,350✔
1909
                // Successful deletion.
1910

1911
        default:
×
1912
                l.log.Errorf("unable to delete %d circuits: %v",
×
1913
                        len(l.closedCircuits), err)
×
1914
                return err
×
1915
        }
1916

1917
        // With the circuits removed from memory and disk, we now ack any
1918
        // Settle/Fails in the mailbox to ensure they do not get redelivered
1919
        // after startup. If forgive is enabled and we've reached this point,
1920
        // the circuits must have been removed at some point, so it is now safe
1921
        // to un-queue the corresponding Settle/Fails.
1922
        for _, inKey := range l.closedCircuits {
1,391✔
1923
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
41✔
1924
                        inKey)
41✔
1925
                l.mailBox.AckPacket(inKey)
41✔
1926
        }
41✔
1927

1928
        // Lastly, reset our buffers to be empty while keeping any acquired
1929
        // growth in the backing array.
1930
        l.openedCircuits = l.openedCircuits[:0]
1,350✔
1931
        l.closedCircuits = l.closedCircuits[:0]
1,350✔
1932

1,350✔
1933
        return nil
1,350✔
1934
}
1935

1936
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
1937
// the link.
1938
func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool {
1,200✔
1939
        err := l.updateCommitTx(ctx)
1,200✔
1940
        switch {
1,200✔
1941
        // No error encountered, success.
1942
        case err == nil:
1,189✔
1943

1944
        // A duplicate keystone error should be resolved and is not fatal, so
1945
        // we won't send an Error message to the peer.
1946
        case errors.Is(err, ErrDuplicateKeystone):
×
1947
                l.failf(LinkFailureError{code: ErrCircuitError},
×
1948
                        "temporary circuit error: %v", err)
×
1949
                return false
×
1950

1951
        // Any other error is treated results in an Error message being sent to
1952
        // the peer.
1953
        default:
11✔
1954
                l.failf(LinkFailureError{code: ErrInternalError},
11✔
1955
                        "unable to update commitment: %v", err)
11✔
1956
                return false
11✔
1957
        }
1958

1959
        return true
1,189✔
1960
}
1961

1962
// updateCommitTx signs, then sends an update to the remote peer adding a new
1963
// commitment to their commitment chain which includes all the latest updates
1964
// we've received+processed up to this point.
1965
func (l *channelLink) updateCommitTx(ctx context.Context) error {
1,258✔
1966
        // Preemptively write all pending keystones to disk, just in case the
1,258✔
1967
        // HTLCs we have in memory are included in the subsequent attempt to
1,258✔
1968
        // sign a commitment state.
1,258✔
1969
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
1,258✔
1970
        if err != nil {
1,258✔
1971
                // If ErrDuplicateKeystone is returned, the caller will catch
×
1972
                // it.
×
1973
                return err
×
1974
        }
×
1975

1976
        // Reset the batch, but keep the backing buffer to avoid reallocating.
1977
        l.keystoneBatch = l.keystoneBatch[:0]
1,258✔
1978

1,258✔
1979
        // If hodl.Commit mode is active, we will refrain from attempting to
1,258✔
1980
        // commit any in-memory modifications to the channel state. Exiting here
1,258✔
1981
        // permits testing of either the switch or link's ability to trim
1,258✔
1982
        // circuits that have been opened, but unsuccessfully committed.
1,258✔
1983
        if l.cfg.HodlMask.Active(hodl.Commit) {
1,265✔
1984
                l.log.Warnf(hodl.Commit.Warning())
7✔
1985
                return nil
7✔
1986
        }
7✔
1987

1988
        ctx, done := l.cg.Create(ctx)
1,254✔
1989
        defer done()
1,254✔
1990

1,254✔
1991
        newCommit, err := l.channel.SignNextCommitment(ctx)
1,254✔
1992
        if err == lnwallet.ErrNoWindow {
1,331✔
1993
                l.cfg.PendingCommitTicker.Resume()
77✔
1994
                l.log.Trace("PendingCommitTicker resumed")
77✔
1995

77✔
1996
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
77✔
1997
                l.log.Tracef("revocation window exhausted, unable to send: "+
77✔
1998
                        "%v, pend_updates=%v, dangling_closes%v", n,
77✔
1999
                        lnutils.SpewLogClosure(l.openedCircuits),
77✔
2000
                        lnutils.SpewLogClosure(l.closedCircuits))
77✔
2001

77✔
2002
                return nil
77✔
2003
        } else if err != nil {
1,257✔
2004
                return err
×
2005
        }
×
2006

2007
        if err := l.ackDownStreamPackets(); err != nil {
1,180✔
2008
                return err
×
2009
        }
×
2010

2011
        l.cfg.PendingCommitTicker.Pause()
1,180✔
2012
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
1,180✔
2013

1,180✔
2014
        // The remote party now has a new pending commitment, so we'll update
1,180✔
2015
        // the contract court to be aware of this new set (the prior old remote
1,180✔
2016
        // pending).
1,180✔
2017
        newUpdate := &contractcourt.ContractUpdate{
1,180✔
2018
                HtlcKey: contractcourt.RemotePendingHtlcSet,
1,180✔
2019
                Htlcs:   newCommit.PendingHTLCs,
1,180✔
2020
        }
1,180✔
2021
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,180✔
2022
        if err != nil {
1,180✔
2023
                l.log.Errorf("unable to notify contract update: %v", err)
×
2024
                return err
×
2025
        }
×
2026

2027
        select {
1,180✔
2028
        case <-l.cg.Done():
12✔
2029
                return ErrLinkShuttingDown
12✔
2030
        default:
1,168✔
2031
        }
2032

2033
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
1,168✔
2034
        if err != nil {
1,168✔
2035
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2036
        }
×
2037

2038
        commitSig := &lnwire.CommitSig{
1,168✔
2039
                ChanID:        l.ChanID(),
1,168✔
2040
                CommitSig:     newCommit.CommitSig,
1,168✔
2041
                HtlcSigs:      newCommit.HtlcSigs,
1,168✔
2042
                PartialSig:    newCommit.PartialSig,
1,168✔
2043
                CustomRecords: auxBlobRecords,
1,168✔
2044
        }
1,168✔
2045
        err = l.cfg.Peer.SendMessage(false, commitSig)
1,168✔
2046
        if err != nil {
1,168✔
2047
                l.log.Errorf("failed to send CommitSig: %v", err)
×
2048
        }
×
2049

2050
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
2051
        // of commit hooks.
2052
        l.RWMutex.Lock()
1,168✔
2053
        l.outgoingCommitHooks.invoke()
1,168✔
2054
        l.RWMutex.Unlock()
1,168✔
2055

1,168✔
2056
        return nil
1,168✔
2057
}
2058

2059
// Peer returns the representation of remote peer with which we have the
2060
// channel link opened.
2061
//
2062
// NOTE: Part of the ChannelLink interface.
2063
func (l *channelLink) PeerPubKey() [33]byte {
443✔
2064
        return l.cfg.Peer.PubKey()
443✔
2065
}
443✔
2066

2067
// ChannelPoint returns the channel outpoint for the channel link.
2068
// NOTE: Part of the ChannelLink interface.
2069
func (l *channelLink) ChannelPoint() wire.OutPoint {
855✔
2070
        return l.channel.ChannelPoint()
855✔
2071
}
855✔
2072

2073
// ShortChanID returns the short channel ID for the channel link. The short
2074
// channel ID encodes the exact location in the main chain that the original
2075
// funding output can be found.
2076
//
2077
// NOTE: Part of the ChannelLink interface.
2078
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
4,233✔
2079
        l.RLock()
4,233✔
2080
        defer l.RUnlock()
4,233✔
2081

4,233✔
2082
        return l.channel.ShortChanID()
4,233✔
2083
}
4,233✔
2084

2085
// UpdateShortChanID updates the short channel ID for a link. This may be
2086
// required in the event that a link is created before the short chan ID for it
2087
// is known, or a re-org occurs, and the funding transaction changes location
2088
// within the chain.
2089
//
2090
// NOTE: Part of the ChannelLink interface.
2091
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
3✔
2092
        chanID := l.ChanID()
3✔
2093

3✔
2094
        // Refresh the channel state's short channel ID by loading it from disk.
3✔
2095
        // This ensures that the channel state accurately reflects the updated
3✔
2096
        // short channel ID.
3✔
2097
        err := l.channel.State().Refresh()
3✔
2098
        if err != nil {
3✔
2099
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2100
                        "%v", chanID, err)
×
2101
                return hop.Source, err
×
2102
        }
×
2103

2104
        return hop.Source, nil
3✔
2105
}
2106

2107
// ChanID returns the channel ID for the channel link. The channel ID is a more
2108
// compact representation of a channel's full outpoint.
2109
//
2110
// NOTE: Part of the ChannelLink interface.
2111
func (l *channelLink) ChanID() lnwire.ChannelID {
3,897✔
2112
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
3,897✔
2113
}
3,897✔
2114

2115
// Bandwidth returns the total amount that can flow through the channel link at
2116
// this given instance. The value returned is expressed in millisatoshi and can
2117
// be used by callers when making forwarding decisions to determine if a link
2118
// can accept an HTLC.
2119
//
2120
// NOTE: Part of the ChannelLink interface.
2121
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
809✔
2122
        // Get the balance available on the channel for new HTLCs. This takes
809✔
2123
        // the channel reserve into account so HTLCs up to this value won't
809✔
2124
        // violate it.
809✔
2125
        return l.channel.AvailableBalance()
809✔
2126
}
809✔
2127

2128
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2129
// amount provided to the link. This check does not reserve a space, since
2130
// forwards or other payments may use the available slot, so it should be
2131
// considered best-effort.
2132
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
3✔
2133
        return l.channel.MayAddOutgoingHtlc(amt)
3✔
2134
}
3✔
2135

2136
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2137
// method.
2138
//
2139
// NOTE: Part of the dustHandler interface.
2140
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2141
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
2,497✔
2142

2,497✔
2143
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
2,497✔
2144
}
2,497✔
2145

2146
// getFeeRate is a wrapper method that retrieves the underlying channel's
2147
// feerate.
2148
//
2149
// NOTE: Part of the dustHandler interface.
2150
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
666✔
2151
        return l.channel.CommitFeeRate()
666✔
2152
}
666✔
2153

2154
// getDustClosure returns a closure that can be used by the switch or mailbox
2155
// to evaluate whether a given HTLC is dust.
2156
//
2157
// NOTE: Part of the dustHandler interface.
2158
func (l *channelLink) getDustClosure() dustClosure {
1,588✔
2159
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
1,588✔
2160
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
1,588✔
2161
        chanType := l.channel.State().ChanType
1,588✔
2162

1,588✔
2163
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
1,588✔
2164
}
1,588✔
2165

2166
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2167
// is used so that the Switch can have access to the commitment fee without
2168
// needing to have a *LightningChannel. This doesn't include dust.
2169
//
2170
// NOTE: Part of the dustHandler interface.
2171
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
1,861✔
2172
        if remote {
2,800✔
2173
                return l.channel.State().RemoteCommitment.CommitFee
939✔
2174
        }
939✔
2175

2176
        return l.channel.State().LocalCommitment.CommitFee
925✔
2177
}
2178

2179
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2180
// increases the total dust and fees within the channel past the configured
2181
// fee threshold. It first calculates the dust sum over every update in the
2182
// update log with the proposed fee-rate and taking into account both the local
2183
// and remote dust limits. It uses every update in the update log instead of
2184
// what is actually on the local and remote commitments because it is assumed
2185
// that in a worst-case scenario, every update in the update log could
2186
// theoretically be on either commitment transaction and this needs to be
2187
// accounted for with this fee-rate. It then calculates the local and remote
2188
// commitment fees given the proposed fee-rate. Finally, it tallies the results
2189
// and determines if the fee threshold has been exceeded.
2190
func (l *channelLink) exceedsFeeExposureLimit(
2191
        feePerKw chainfee.SatPerKWeight) (bool, error) {
6✔
2192

6✔
2193
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
6✔
2194

6✔
2195
        // Get the sum of dust for both the local and remote commitments using
6✔
2196
        // this "dry-run" fee.
6✔
2197
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
6✔
2198
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
6✔
2199

6✔
2200
        // Calculate the local and remote commitment fees using this dry-run
6✔
2201
        // fee.
6✔
2202
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
6✔
2203
        if err != nil {
6✔
2204
                return false, err
×
2205
        }
×
2206

2207
        // Finally, check whether the max fee exposure was exceeded on either
2208
        // future commitment transaction with the fee-rate.
2209
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
6✔
2210
        if totalLocalDust > l.cfg.MaxFeeExposure {
6✔
2211
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
2212
                        "local dust: %v, local fee: %v", l.ShortChanID(),
×
2213
                        totalLocalDust, localFee)
×
2214

×
2215
                return true, nil
×
2216
        }
×
2217

2218
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
6✔
2219
                remoteFee,
6✔
2220
        )
6✔
2221

6✔
2222
        if totalRemoteDust > l.cfg.MaxFeeExposure {
6✔
2223
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
2224
                        "remote dust: %v, remote fee: %v", l.ShortChanID(),
×
2225
                        totalRemoteDust, remoteFee)
×
2226

×
2227
                return true, nil
×
2228
        }
×
2229

2230
        return false, nil
6✔
2231
}
2232

2233
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
2234
// channel exceed the fee threshold. It first fetches the largest fee-rate that
2235
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
2236
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
2237
// the overall dust sum. If it is not dust, it contributes to weight, which
2238
// also adds to the overall dust sum by an increase in fees. If the dust sum on
2239
// either commitment exceeds the configured fee threshold, this function
2240
// returns true.
2241
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
2242
        incoming bool) bool {
925✔
2243

925✔
2244
        dustClosure := l.getDustClosure()
925✔
2245

925✔
2246
        feeRate := l.channel.WorstCaseFeeRate()
925✔
2247

925✔
2248
        amount := htlc.Amount.ToSatoshis()
925✔
2249

925✔
2250
        // See if this HTLC is dust on both the local and remote commitments.
925✔
2251
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
925✔
2252
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
925✔
2253

925✔
2254
        // Calculate the dust sum for the local and remote commitments.
925✔
2255
        localDustSum := l.getDustSum(
925✔
2256
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
925✔
2257
        )
925✔
2258
        remoteDustSum := l.getDustSum(
925✔
2259
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
925✔
2260
        )
925✔
2261

925✔
2262
        // Grab the larger of the local and remote commitment fees w/o dust.
925✔
2263
        commitFee := l.getCommitFee(false)
925✔
2264

925✔
2265
        if l.getCommitFee(true) > commitFee {
940✔
2266
                commitFee = l.getCommitFee(true)
15✔
2267
        }
15✔
2268

2269
        commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
925✔
2270

925✔
2271
        localDustSum += commitFeeMSat
925✔
2272
        remoteDustSum += commitFeeMSat
925✔
2273

925✔
2274
        // Calculate the additional fee increase if this is a non-dust HTLC.
925✔
2275
        weight := lntypes.WeightUnit(input.HTLCWeight)
925✔
2276
        additional := lnwire.NewMSatFromSatoshis(
925✔
2277
                feeRate.FeeForWeight(weight),
925✔
2278
        )
925✔
2279

925✔
2280
        if isLocalDust {
1,554✔
2281
                // If this is dust, it doesn't contribute to weight but does
629✔
2282
                // contribute to the overall dust sum.
629✔
2283
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
629✔
2284
        } else {
928✔
2285
                // Account for the fee increase that comes with an increase in
299✔
2286
                // weight.
299✔
2287
                localDustSum += additional
299✔
2288
        }
299✔
2289

2290
        if localDustSum > l.cfg.MaxFeeExposure {
925✔
UNCOV
2291
                // The max fee exposure was exceeded.
×
UNCOV
2292
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
UNCOV
2293
                        "overexposed, total local dust: %v (current commit "+
×
UNCOV
2294
                        "fee: %v)", l.ShortChanID(), htlc, localDustSum)
×
UNCOV
2295

×
UNCOV
2296
                return true
×
UNCOV
2297
        }
×
2298

2299
        if isRemoteDust {
1,554✔
2300
                // If this is dust, it doesn't contribute to weight but does
629✔
2301
                // contribute to the overall dust sum.
629✔
2302
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
629✔
2303
        } else {
928✔
2304
                // Account for the fee increase that comes with an increase in
299✔
2305
                // weight.
299✔
2306
                remoteDustSum += additional
299✔
2307
        }
299✔
2308

2309
        if remoteDustSum > l.cfg.MaxFeeExposure {
925✔
2310
                // The max fee exposure was exceeded.
×
2311
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
2312
                        "overexposed, total remote dust: %v (current commit "+
×
2313
                        "fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
×
2314

×
2315
                return true
×
2316
        }
×
2317

2318
        return false
925✔
2319
}
2320

2321
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
2322
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
2323
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
2324
// whether to evaluate on the local or remote commit, and finally an HTLC
2325
// amount to test.
2326
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
2327
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
2328

2329
// dustHelper is used to construct the dustClosure.
2330
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
2331
        remoteDustLimit btcutil.Amount) dustClosure {
1,788✔
2332

1,788✔
2333
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
1,788✔
2334
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
11,835✔
2335

10,047✔
2336
                var dustLimit btcutil.Amount
10,047✔
2337
                if whoseCommit.IsLocal() {
15,072✔
2338
                        dustLimit = localDustLimit
5,025✔
2339
                } else {
10,050✔
2340
                        dustLimit = remoteDustLimit
5,025✔
2341
                }
5,025✔
2342

2343
                return lnwallet.HtlcIsDust(
10,047✔
2344
                        chantype, incoming, whoseCommit, feerate, amt,
10,047✔
2345
                        dustLimit,
10,047✔
2346
                )
10,047✔
2347
        }
2348

2349
        return isDust
1,788✔
2350
}
2351

2352
// zeroConfConfirmed returns whether or not the zero-conf channel has
2353
// confirmed on-chain.
2354
//
2355
// Part of the scidAliasHandler interface.
2356
func (l *channelLink) zeroConfConfirmed() bool {
6✔
2357
        return l.channel.State().ZeroConfConfirmed()
6✔
2358
}
6✔
2359

2360
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
2361
// should not be called for non-zero-conf channels.
2362
//
2363
// Part of the scidAliasHandler interface.
2364
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
6✔
2365
        return l.channel.State().ZeroConfRealScid()
6✔
2366
}
6✔
2367

2368
// isZeroConf returns whether or not the underlying channel is a zero-conf
2369
// channel.
2370
//
2371
// Part of the scidAliasHandler interface.
2372
func (l *channelLink) isZeroConf() bool {
216✔
2373
        return l.channel.State().IsZeroConf()
216✔
2374
}
216✔
2375

2376
// negotiatedAliasFeature returns whether or not the underlying channel has
2377
// negotiated the option-scid-alias feature bit. This will be true for both
2378
// option-scid-alias and zero-conf channel-types. It will also be true for
2379
// channels with the feature bit but without the above channel-types.
2380
//
2381
// Part of the scidAliasFeature interface.
2382
func (l *channelLink) negotiatedAliasFeature() bool {
377✔
2383
        return l.channel.State().NegotiatedAliasFeature()
377✔
2384
}
377✔
2385

2386
// getAliases returns the set of aliases for the underlying channel.
2387
//
2388
// Part of the scidAliasHandler interface.
2389
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
222✔
2390
        return l.cfg.GetAliases(l.ShortChanID())
222✔
2391
}
222✔
2392

2393
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
2394
//
2395
// Part of the scidAliasHandler interface.
2396
func (l *channelLink) attachFailAliasUpdate(closure func(
2397
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
217✔
2398

217✔
2399
        l.Lock()
217✔
2400
        l.cfg.FailAliasUpdate = closure
217✔
2401
        l.Unlock()
217✔
2402
}
217✔
2403

2404
// AttachMailBox updates the current mailbox used by this link, and hooks up
2405
// the mailbox's message and packet outboxes to the link's upstream and
2406
// downstream chans, respectively.
2407
func (l *channelLink) AttachMailBox(mailbox MailBox) {
216✔
2408
        l.Lock()
216✔
2409
        l.mailBox = mailbox
216✔
2410
        l.upstream = mailbox.MessageOutBox()
216✔
2411
        l.downstream = mailbox.PacketOutBox()
216✔
2412
        l.Unlock()
216✔
2413

216✔
2414
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
216✔
2415
        // never committed.
216✔
2416
        l.mailBox.SetFeeRate(l.getFeeRate())
216✔
2417

216✔
2418
        // Also set the mailbox's dust closure so that it can query whether HTLC's
216✔
2419
        // are dust given the current feerate.
216✔
2420
        l.mailBox.SetDustClosure(l.getDustClosure())
216✔
2421
}
216✔
2422

2423
// UpdateForwardingPolicy updates the forwarding policy for the target
2424
// ChannelLink. Once updated, the link will use the new forwarding policy to
2425
// govern if it an incoming HTLC should be forwarded or not. We assume that
2426
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
2427
// update all of the link's FwrdingPolicy's values.
2428
//
2429
// NOTE: Part of the ChannelLink interface.
2430
func (l *channelLink) UpdateForwardingPolicy(
2431
        newPolicy models.ForwardingPolicy) {
15✔
2432

15✔
2433
        l.Lock()
15✔
2434
        defer l.Unlock()
15✔
2435

15✔
2436
        l.cfg.FwrdingPolicy = newPolicy
15✔
2437
}
15✔
2438

2439
// CheckHtlcForward should return a nil error if the passed HTLC details
2440
// satisfy the current forwarding policy fo the target link. Otherwise,
2441
// a LinkError with a valid protocol failure message should be returned
2442
// in order to signal to the source of the HTLC, the policy consistency
2443
// issue.
2444
//
2445
// NOTE: Part of the ChannelLink interface.
2446
func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
2447
        amtToForward lnwire.MilliSatoshi, incomingTimeout,
2448
        outgoingTimeout uint32, inboundFee models.InboundFee,
2449
        heightNow uint32, originalScid lnwire.ShortChannelID,
2450
        customRecords lnwire.CustomRecords) *LinkError {
51✔
2451

51✔
2452
        l.RLock()
51✔
2453
        policy := l.cfg.FwrdingPolicy
51✔
2454
        l.RUnlock()
51✔
2455

51✔
2456
        // Using the outgoing HTLC amount, we'll calculate the outgoing
51✔
2457
        // fee this incoming HTLC must carry in order to satisfy the constraints
51✔
2458
        // of the outgoing link.
51✔
2459
        outFee := ExpectedFee(policy, amtToForward)
51✔
2460

51✔
2461
        // Then calculate the inbound fee that we charge based on the sum of
51✔
2462
        // outgoing HTLC amount and outgoing fee.
51✔
2463
        inFee := inboundFee.CalcFee(amtToForward + outFee)
51✔
2464

51✔
2465
        // Add up both fee components. It is important to calculate both fees
51✔
2466
        // separately. An alternative way of calculating is to first determine
51✔
2467
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
51✔
2468
        // rounding may cause the result to be slightly higher than in the case
51✔
2469
        // of separately rounded fee components. This potentially causes failed
51✔
2470
        // forwards for senders and is something to be avoided.
51✔
2471
        expectedFee := inFee + int64(outFee)
51✔
2472

51✔
2473
        // If the actual fee is less than our expected fee, then we'll reject
51✔
2474
        // this HTLC as it didn't provide a sufficient amount of fees, or the
51✔
2475
        // values have been tampered with, or the send used incorrect/dated
51✔
2476
        // information to construct the forwarding information for this hop. In
51✔
2477
        // any case, we'll cancel this HTLC.
51✔
2478
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
51✔
2479
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
60✔
2480
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
9✔
2481
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
9✔
2482
                        "inboundFee=%v",
9✔
2483
                        payHash[:], expectedFee, actualFee,
9✔
2484
                        incomingHtlcAmt, amtToForward, inboundFee,
9✔
2485
                )
9✔
2486

9✔
2487
                // As part of the returned error, we'll send our latest routing
9✔
2488
                // policy so the sending node obtains the most up to date data.
9✔
2489
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
18✔
2490
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
9✔
2491
                }
9✔
2492
                failure := l.createFailureWithUpdate(false, originalScid, cb)
9✔
2493
                return NewLinkError(failure)
9✔
2494
        }
2495

2496
        // Check whether the outgoing htlc satisfies the channel policy.
2497
        err := l.canSendHtlc(
45✔
2498
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
45✔
2499
                originalScid, customRecords,
45✔
2500
        )
45✔
2501
        if err != nil {
61✔
2502
                return err
16✔
2503
        }
16✔
2504

2505
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
2506
        // the following constraint: the incoming time-lock minus our time-lock
2507
        // delta should equal the outgoing time lock. Otherwise, whether the
2508
        // sender messed up, or an intermediate node tampered with the HTLC.
2509
        timeDelta := policy.TimeLockDelta
32✔
2510
        if incomingTimeout < outgoingTimeout+timeDelta {
34✔
2511
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2✔
2512
                        "expected at least %v block delta, got %v block delta",
2✔
2513
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2✔
2514

2✔
2515
                // Grab the latest routing policy so the sending node is up to
2✔
2516
                // date with our current policy.
2✔
2517
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
2518
                        return lnwire.NewIncorrectCltvExpiry(
2✔
2519
                                incomingTimeout, *upd,
2✔
2520
                        )
2✔
2521
                }
2✔
2522
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
2523
                return NewLinkError(failure)
2✔
2524
        }
2525

2526
        return nil
30✔
2527
}
2528

2529
// CheckHtlcTransit should return a nil error if the passed HTLC details
2530
// satisfy the current channel policy.  Otherwise, a LinkError with a
2531
// valid protocol failure message should be returned in order to signal
2532
// the violation. This call is intended to be used for locally initiated
2533
// payments for which there is no corresponding incoming htlc.
2534
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
2535
        amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
2536
        customRecords lnwire.CustomRecords) *LinkError {
405✔
2537

405✔
2538
        l.RLock()
405✔
2539
        policy := l.cfg.FwrdingPolicy
405✔
2540
        l.RUnlock()
405✔
2541

405✔
2542
        // We pass in hop.Source here as this is only used in the Switch when
405✔
2543
        // trying to send over a local link. This causes the fallback mechanism
405✔
2544
        // to occur.
405✔
2545
        return l.canSendHtlc(
405✔
2546
                policy, payHash, amt, timeout, heightNow, hop.Source,
405✔
2547
                customRecords,
405✔
2548
        )
405✔
2549
}
405✔
2550

2551
// canSendHtlc checks whether the given htlc parameters satisfy
2552
// the channel's amount and time lock constraints.
2553
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
2554
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
2555
        heightNow uint32, originalScid lnwire.ShortChannelID,
2556
        customRecords lnwire.CustomRecords) *LinkError {
447✔
2557

447✔
2558
        // As our first sanity check, we'll ensure that the passed HTLC isn't
447✔
2559
        // too small for the next hop. If so, then we'll cancel the HTLC
447✔
2560
        // directly.
447✔
2561
        if amt < policy.MinHTLCOut {
458✔
2562
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
11✔
2563
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
11✔
2564
                        amt)
11✔
2565

11✔
2566
                // As part of the returned error, we'll send our latest routing
11✔
2567
                // policy so the sending node obtains the most up to date data.
11✔
2568
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
22✔
2569
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
11✔
2570
                }
11✔
2571
                failure := l.createFailureWithUpdate(false, originalScid, cb)
11✔
2572
                return NewLinkError(failure)
11✔
2573
        }
2574

2575
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
2576
        // cancel the HTLC directly.
2577
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
445✔
2578
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
6✔
2579
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
6✔
2580

6✔
2581
                // As part of the returned error, we'll send our latest routing
6✔
2582
                // policy so the sending node obtains the most up-to-date data.
6✔
2583
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
12✔
2584
                        return lnwire.NewTemporaryChannelFailure(upd)
6✔
2585
                }
6✔
2586
                failure := l.createFailureWithUpdate(false, originalScid, cb)
6✔
2587
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
6✔
2588
        }
2589

2590
        // We want to avoid offering an HTLC which will expire in the near
2591
        // future, so we'll reject an HTLC if the outgoing expiration time is
2592
        // too close to the current height.
2593
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
438✔
2594
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2✔
2595
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
2✔
2596
                        timeout, heightNow)
2✔
2597

2✔
2598
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
2599
                        return lnwire.NewExpiryTooSoon(*upd)
2✔
2600
                }
2✔
2601
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
2602
                return NewLinkError(failure)
2✔
2603
        }
2604

2605
        // Check absolute max delta.
2606
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
435✔
2607
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
1✔
2608
                        "the future: got %v, but maximum is %v", payHash[:],
1✔
2609
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
1✔
2610

1✔
2611
                return NewLinkError(&lnwire.FailExpiryTooFar{})
1✔
2612
        }
1✔
2613

2614
        // We now check the available bandwidth to see if this HTLC can be
2615
        // forwarded.
2616
        availableBandwidth := l.Bandwidth()
433✔
2617
        auxBandwidth, err := fn.MapOptionZ(
433✔
2618
                l.cfg.AuxTrafficShaper,
433✔
2619
                func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
433✔
2620
                        var htlcBlob fn.Option[tlv.Blob]
×
2621
                        blob, err := customRecords.Serialize()
×
2622
                        if err != nil {
×
2623
                                return fn.Err[OptionalBandwidth](
×
2624
                                        fmt.Errorf("unable to serialize "+
×
2625
                                                "custom records: %w", err))
×
2626
                        }
×
2627

2628
                        if len(blob) > 0 {
×
2629
                                htlcBlob = fn.Some(blob)
×
2630
                        }
×
2631

2632
                        return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
×
2633
                },
2634
        ).Unpack()
2635
        if err != nil {
433✔
2636
                l.log.Errorf("Unable to determine aux bandwidth: %v", err)
×
2637
                return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
×
2638
        }
×
2639

2640
        if auxBandwidth.IsHandled && auxBandwidth.Bandwidth.IsSome() {
433✔
2641
                auxBandwidth.Bandwidth.WhenSome(
×
2642
                        func(bandwidth lnwire.MilliSatoshi) {
×
2643
                                availableBandwidth = bandwidth
×
2644
                        },
×
2645
                )
2646
        }
2647

2648
        // Check to see if there is enough balance in this channel.
2649
        if amt > availableBandwidth {
437✔
2650
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
4✔
2651
                        "larger than %v", amt, availableBandwidth)
4✔
2652
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
8✔
2653
                        return lnwire.NewTemporaryChannelFailure(upd)
4✔
2654
                }
4✔
2655
                failure := l.createFailureWithUpdate(false, originalScid, cb)
4✔
2656
                return NewDetailedLinkError(
4✔
2657
                        failure, OutgoingFailureInsufficientBalance,
4✔
2658
                )
4✔
2659
        }
2660

2661
        return nil
432✔
2662
}
2663

2664
// AuxBandwidth returns the bandwidth that can be used for a channel, expressed
2665
// in milli-satoshi. This might be different from the regular BTC bandwidth for
2666
// custom channels. This will always return fn.None() for a regular (non-custom)
2667
// channel.
2668
func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
2669
        cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
2670
        ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
×
2671

×
2672
        fundingBlob := l.FundingCustomBlob()
×
2673
        shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob, htlcBlob)
×
2674
        if err != nil {
×
2675
                return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
×
2676
                        "failed to decide whether to handle traffic: %w", err))
×
2677
        }
×
2678

2679
        log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
×
2680
                "traffic: %v", cid, shouldHandle)
×
2681

×
2682
        // If this channel isn't handled by the aux traffic shaper, we'll return
×
2683
        // early.
×
2684
        if !shouldHandle {
×
2685
                return fn.Ok(OptionalBandwidth{
×
2686
                        IsHandled: false,
×
2687
                })
×
2688
        }
×
2689

2690
        peerBytes := l.cfg.Peer.PubKey()
×
2691

×
2692
        peer, err := route.NewVertexFromBytes(peerBytes[:])
×
2693
        if err != nil {
×
2694
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to decode "+
×
2695
                        "peer pub key: %v", err))
×
2696
        }
×
2697

2698
        // Ask for a specific bandwidth to be used for the channel.
2699
        commitmentBlob := l.CommitmentCustomBlob()
×
2700
        auxBandwidth, err := ts.PaymentBandwidth(
×
2701
                fundingBlob, htlcBlob, commitmentBlob, l.Bandwidth(), amount,
×
2702
                l.channel.FetchLatestAuxHTLCView(), peer,
×
2703
        )
×
2704
        if err != nil {
×
2705
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
×
2706
                        "bandwidth from external traffic shaper: %w", err))
×
2707
        }
×
2708

2709
        log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
×
2710
                "bandwidth: %v", cid, auxBandwidth)
×
2711

×
2712
        return fn.Ok(OptionalBandwidth{
×
2713
                IsHandled: true,
×
2714
                Bandwidth: fn.Some(auxBandwidth),
×
2715
        })
×
2716
}
2717

2718
// Stats returns the statistics of channel link.
2719
//
2720
// NOTE: Part of the ChannelLink interface.
2721
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
7✔
2722
        snapshot := l.channel.StateSnapshot()
7✔
2723

7✔
2724
        return snapshot.ChannelCommitment.CommitHeight,
7✔
2725
                snapshot.TotalMSatSent,
7✔
2726
                snapshot.TotalMSatReceived
7✔
2727
}
7✔
2728

2729
// String returns the string representation of channel link.
2730
//
2731
// NOTE: Part of the ChannelLink interface.
2732
func (l *channelLink) String() string {
×
2733
        return l.channel.ChannelPoint().String()
×
2734
}
×
2735

2736
// handleSwitchPacket handles the switch packets. This packets which might be
2737
// forwarded to us from another channel link in case the htlc update came from
2738
// another peer or if the update was created by user
2739
//
2740
// NOTE: Part of the packetHandler interface.
2741
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
477✔
2742
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
477✔
2743
                pkt.inKey(), pkt.outKey())
477✔
2744

477✔
2745
        return l.mailBox.AddPacket(pkt)
477✔
2746
}
477✔
2747

2748
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
2749
// to us from remote peer we have a channel with.
2750
//
2751
// NOTE: Part of the ChannelLink interface.
2752
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
3,297✔
2753
        select {
3,297✔
2754
        case <-l.cg.Done():
×
2755
                // Return early if the link is already in the process of
×
2756
                // quitting. It doesn't make sense to hand the message to the
×
2757
                // mailbox here.
×
2758
                return
×
2759
        default:
3,297✔
2760
        }
2761

2762
        err := l.mailBox.AddMessage(message)
3,297✔
2763
        if err != nil {
3,297✔
2764
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
2765
        }
×
2766
}
2767

2768
// updateChannelFee updates the commitment fee-per-kw on this channel by
2769
// committing to an update_fee message.
2770
func (l *channelLink) updateChannelFee(ctx context.Context,
2771
        feePerKw chainfee.SatPerKWeight) error {
3✔
2772

3✔
2773
        l.log.Infof("updating commit fee to %v", feePerKw)
3✔
2774

3✔
2775
        // We skip sending the UpdateFee message if the channel is not
3✔
2776
        // currently eligible to forward messages.
3✔
2777
        if !l.eligibleToUpdate() {
3✔
2778
                l.log.Debugf("skipping fee update for inactive channel")
×
2779
                return nil
×
2780
        }
×
2781

2782
        // Check and see if our proposed fee-rate would make us exceed the fee
2783
        // threshold.
2784
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
3✔
2785
        if err != nil {
3✔
2786
                // This shouldn't typically happen. If it does, it indicates
×
2787
                // something is wrong with our channel state.
×
2788
                return err
×
2789
        }
×
2790

2791
        if thresholdExceeded {
3✔
2792
                return fmt.Errorf("link fee threshold exceeded")
×
2793
        }
×
2794

2795
        // First, we'll update the local fee on our commitment.
2796
        if err := l.channel.UpdateFee(feePerKw); err != nil {
3✔
2797
                return err
×
2798
        }
×
2799

2800
        // The fee passed the channel's validation checks, so we update the
2801
        // mailbox feerate.
2802
        l.mailBox.SetFeeRate(feePerKw)
3✔
2803

3✔
2804
        // We'll then attempt to send a new UpdateFee message, and also lock it
3✔
2805
        // in immediately by triggering a commitment update.
3✔
2806
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
3✔
2807
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
2808
                return err
×
2809
        }
×
2810

2811
        return l.updateCommitTx(ctx)
3✔
2812
}
2813

2814
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
2815
// after receiving a revocation from the remote party, and reprocesses them in
2816
// the context of the provided forwarding package. Any settles or fails that
2817
// have already been acknowledged in the forwarding package will not be sent to
2818
// the switch.
2819
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
1,157✔
2820
        if len(fwdPkg.SettleFails) == 0 {
2,000✔
2821
                l.log.Trace("fwd package has no settle/fails to process " +
843✔
2822
                        "exiting early")
843✔
2823

843✔
2824
                return
843✔
2825
        }
843✔
2826

2827
        // Exit early if the fwdPkg is already processed.
2828
        if fwdPkg.State == channeldb.FwdStateCompleted {
317✔
2829
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
2830

×
2831
                return
×
2832
        }
×
2833

2834
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
317✔
2835

317✔
2836
        var switchPackets []*htlcPacket
317✔
2837
        for i, update := range fwdPkg.SettleFails {
634✔
2838
                destRef := fwdPkg.DestRef(uint16(i))
317✔
2839

317✔
2840
                // Skip any settles or fails that have already been
317✔
2841
                // acknowledged by the incoming link that originated the
317✔
2842
                // forwarded Add.
317✔
2843
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
317✔
2844
                        continue
×
2845
                }
2846

2847
                // TODO(roasbeef): rework log entries to a shared
2848
                // interface.
2849

2850
                switch msg := update.UpdateMsg.(type) {
317✔
2851
                // A settle for an HTLC we previously forwarded HTLC has been
2852
                // received. So we'll forward the HTLC to the switch which will
2853
                // handle propagating the settle to the prior hop.
2854
                case *lnwire.UpdateFulfillHTLC:
195✔
2855
                        // If hodl.SettleIncoming is requested, we will not
195✔
2856
                        // forward the SETTLE to the switch and will not signal
195✔
2857
                        // a free slot on the commitment transaction.
195✔
2858
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
195✔
2859
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
2860
                                continue
×
2861
                        }
2862

2863
                        settlePacket := &htlcPacket{
195✔
2864
                                outgoingChanID: l.ShortChanID(),
195✔
2865
                                outgoingHTLCID: msg.ID,
195✔
2866
                                destRef:        &destRef,
195✔
2867
                                htlc:           msg,
195✔
2868
                        }
195✔
2869

195✔
2870
                        // Add the packet to the batch to be forwarded, and
195✔
2871
                        // notify the overflow queue that a spare spot has been
195✔
2872
                        // freed up within the commitment state.
195✔
2873
                        switchPackets = append(switchPackets, settlePacket)
195✔
2874

2875
                // A failureCode message for a previously forwarded HTLC has
2876
                // been received. As a result a new slot will be freed up in
2877
                // our commitment state, so we'll forward this to the switch so
2878
                // the backwards undo can continue.
2879
                case *lnwire.UpdateFailHTLC:
125✔
2880
                        // If hodl.SettleIncoming is requested, we will not
125✔
2881
                        // forward the FAIL to the switch and will not signal a
125✔
2882
                        // free slot on the commitment transaction.
125✔
2883
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
125✔
2884
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
2885
                                continue
×
2886
                        }
2887

2888
                        // Fetch the reason the HTLC was canceled so we can
2889
                        // continue to propagate it. This failure originated
2890
                        // from another node, so the linkFailure field is not
2891
                        // set on the packet.
2892
                        failPacket := &htlcPacket{
125✔
2893
                                outgoingChanID: l.ShortChanID(),
125✔
2894
                                outgoingHTLCID: msg.ID,
125✔
2895
                                destRef:        &destRef,
125✔
2896
                                htlc:           msg,
125✔
2897
                        }
125✔
2898

125✔
2899
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
125✔
2900

125✔
2901
                        // If the failure message lacks an HMAC (but includes
125✔
2902
                        // the 4 bytes for encoding the message and padding
125✔
2903
                        // lengths, then this means that we received it as an
125✔
2904
                        // UpdateFailMalformedHTLC. As a result, we'll signal
125✔
2905
                        // that we need to convert this error within the switch
125✔
2906
                        // to an actual error, by encrypting it as if we were
125✔
2907
                        // the originating hop.
125✔
2908
                        convertedErrorSize := lnwire.FailureMessageLength + 4
125✔
2909
                        if len(msg.Reason) == convertedErrorSize {
131✔
2910
                                failPacket.convertedError = true
6✔
2911
                        }
6✔
2912

2913
                        // Add the packet to the batch to be forwarded, and
2914
                        // notify the overflow queue that a spare spot has been
2915
                        // freed up within the commitment state.
2916
                        switchPackets = append(switchPackets, failPacket)
125✔
2917
                }
2918
        }
2919

2920
        // Only spawn the task forward packets we have a non-zero number.
2921
        if len(switchPackets) > 0 {
634✔
2922
                go l.forwardBatch(false, switchPackets...)
317✔
2923
        }
317✔
2924
}
2925

2926
// processRemoteAdds serially processes each of the Add payment descriptors
2927
// which have been "locked-in" by receiving a revocation from the remote party.
2928
// The forwarding package provided instructs how to process this batch,
2929
// indicating whether this is the first time these Adds are being processed, or
2930
// whether we are reprocessing as a result of a failure or restart. Adds that
2931
// have already been acknowledged in the forwarding package will be ignored.
2932
//
2933
// NOTE: This function needs also be called for fwd packages with no ADDs
2934
// because it marks the fwdPkg as processed by writing the FwdFilter into the
2935
// database.
2936
//
2937
//nolint:funlen
2938
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
1,159✔
2939
        // Exit early if the fwdPkg is already processed.
1,159✔
2940
        if fwdPkg.State == channeldb.FwdStateCompleted {
1,159✔
2941
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
2942

×
2943
                return
×
2944
        }
×
2945

2946
        l.log.Tracef("processing %d remote adds for height %d",
1,159✔
2947
                len(fwdPkg.Adds), fwdPkg.Height)
1,159✔
2948

1,159✔
2949
        // decodeReqs is a list of requests sent to the onion decoder. We expect
1,159✔
2950
        // the same length of responses to be returned.
1,159✔
2951
        decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds))
1,159✔
2952

1,159✔
2953
        // unackedAdds is a list of ADDs that's waiting for the remote's
1,159✔
2954
        // settle/fail update.
1,159✔
2955
        unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds))
1,159✔
2956

1,159✔
2957
        for i, update := range fwdPkg.Adds {
1,595✔
2958
                // If this index is already found in the ack filter, the
436✔
2959
                // response to this forwarding decision has already been
436✔
2960
                // committed by one of our commitment txns. ADDs in this state
436✔
2961
                // are waiting for the rest of the fwding package to get acked
436✔
2962
                // before being garbage collected.
436✔
2963
                if fwdPkg.State == channeldb.FwdStateProcessed &&
436✔
2964
                        fwdPkg.AckFilter.Contains(uint16(i)) {
436✔
2965

×
2966
                        continue
×
2967
                }
2968

2969
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
872✔
2970
                        // Before adding the new htlc to the state machine,
436✔
2971
                        // parse the onion object in order to obtain the
436✔
2972
                        // routing information with DecodeHopIterator function
436✔
2973
                        // which process the Sphinx packet.
436✔
2974
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
436✔
2975

436✔
2976
                        req := hop.DecodeHopIteratorRequest{
436✔
2977
                                OnionReader:    onionReader,
436✔
2978
                                RHash:          msg.PaymentHash[:],
436✔
2979
                                IncomingCltv:   msg.Expiry,
436✔
2980
                                IncomingAmount: msg.Amount,
436✔
2981
                                BlindingPoint:  msg.BlindingPoint,
436✔
2982
                        }
436✔
2983

436✔
2984
                        decodeReqs = append(decodeReqs, req)
436✔
2985
                        unackedAdds = append(unackedAdds, msg)
436✔
2986
                }
436✔
2987
        }
2988

2989
        // If the fwdPkg has already been processed, it means we are
2990
        // reforwarding the packets again, which happens only on a restart.
2991
        reforward := fwdPkg.State == channeldb.FwdStateProcessed
1,159✔
2992

1,159✔
2993
        // Atomically decode the incoming htlcs, simultaneously checking for
1,159✔
2994
        // replay attempts. A particular index in the returned, spare list of
1,159✔
2995
        // channel iterators should only be used if the failure code at the
1,159✔
2996
        // same index is lnwire.FailCodeNone.
1,159✔
2997
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
1,159✔
2998
                fwdPkg.ID(), decodeReqs, reforward,
1,159✔
2999
        )
1,159✔
3000
        if sphinxErr != nil {
1,159✔
3001
                l.failf(LinkFailureError{code: ErrInternalError},
×
3002
                        "unable to decode hop iterators: %v", sphinxErr)
×
3003
                return
×
3004
        }
×
3005

3006
        var switchPackets []*htlcPacket
1,159✔
3007

1,159✔
3008
        for i, update := range unackedAdds {
1,595✔
3009
                idx := uint16(i)
436✔
3010
                sourceRef := fwdPkg.SourceRef(idx)
436✔
3011
                add := *update
436✔
3012

436✔
3013
                // An incoming HTLC add has been full-locked in. As a result we
436✔
3014
                // can now examine the forwarding details of the HTLC, and the
436✔
3015
                // HTLC itself to decide if: we should forward it, cancel it,
436✔
3016
                // or are able to settle it (and it adheres to our fee related
436✔
3017
                // constraints).
436✔
3018

436✔
3019
                // Before adding the new htlc to the state machine, parse the
436✔
3020
                // onion object in order to obtain the routing information with
436✔
3021
                // DecodeHopIterator function which process the Sphinx packet.
436✔
3022
                chanIterator, failureCode := decodeResps[i].Result()
436✔
3023
                if failureCode != lnwire.CodeNone {
441✔
3024
                        // If we're unable to process the onion blob then we
5✔
3025
                        // should send the malformed htlc error to payment
5✔
3026
                        // sender.
5✔
3027
                        l.sendMalformedHTLCError(
5✔
3028
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
5✔
3029
                        )
5✔
3030

5✔
3031
                        l.log.Errorf("unable to decode onion hop iterator "+
5✔
3032
                                "for htlc(id=%v, hash=%x): %v", add.ID,
5✔
3033
                                add.PaymentHash, failureCode)
5✔
3034

5✔
3035
                        continue
5✔
3036
                }
3037

3038
                heightNow := l.cfg.BestHeight()
434✔
3039

434✔
3040
                pld, routeRole, pldErr := chanIterator.HopPayload()
434✔
3041
                if pldErr != nil {
437✔
3042
                        // If we're unable to process the onion payload, or we
3✔
3043
                        // received invalid onion payload failure, then we
3✔
3044
                        // should send an error back to the caller so the HTLC
3✔
3045
                        // can be canceled.
3✔
3046
                        var failedType uint64
3✔
3047

3✔
3048
                        // We need to get the underlying error value, so we
3✔
3049
                        // can't use errors.As as suggested by the linter.
3✔
3050
                        //nolint:errorlint
3✔
3051
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
3✔
3052
                                failedType = uint64(e.Type)
×
3053
                        }
×
3054

3055
                        // Let's extract the error encrypter parameters.
3056
                        ephemeralKey, sharedSecret, blindingPoint, failCode :=
3✔
3057
                                chanIterator.ExtractEncrypterParams(
3✔
3058
                                        l.cfg.ExtractSharedSecret,
3✔
3059
                                )
3✔
3060
                        if failCode != lnwire.CodeNone {
3✔
3061
                                l.log.Errorf("could not extract error "+
×
3062
                                        "encrypter: %v", pldErr)
×
3063

×
3064
                                // We can't process this htlc, send back
×
3065
                                // malformed.
×
3066
                                l.sendMalformedHTLCError(
×
3067
                                        add.ID, failureCode, add.OnionBlob,
×
3068
                                        &sourceRef,
×
3069
                                )
×
3070

×
3071
                                continue
×
3072
                        }
3073

3074
                        // If we couldn't parse the payload, make our best
3075
                        // effort at creating an error encrypter that knows
3076
                        // what blinding type we were, but if we couldn't
3077
                        // parse the payload we have no way of knowing whether
3078
                        // we were the introduction node or not. Let's create
3079
                        // the error encrypter based on the extracted encryption
3080
                        // parameters.
3081
                        obfuscator := l.cfg.CreateErrorEncrypter(
3✔
3082
                                ephemeralKey, sharedSecret,
3✔
3083
                                // We need our route role here because we
3✔
3084
                                // couldn't parse or validate the payload.
3✔
3085
                                routeRole == hop.RouteRoleIntroduction,
3✔
3086
                                blindingPoint.IsSome(),
3✔
3087
                        )
3✔
3088

3✔
3089
                        // TODO: currently none of the test unit infrastructure
3✔
3090
                        // is setup to handle TLV payloads, so testing this
3✔
3091
                        // would require implementing a separate mock iterator
3✔
3092
                        // for TLV payloads that also supports injecting invalid
3✔
3093
                        // payloads. Deferring this non-trival effort till a
3✔
3094
                        // later date
3✔
3095
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
3✔
3096

3✔
3097
                        l.sendHTLCError(
3✔
3098
                                add, sourceRef, NewLinkError(failure),
3✔
3099
                                obfuscator, false,
3✔
3100
                        )
3✔
3101

3✔
3102
                        l.log.Errorf("unable to decode forwarding "+
3✔
3103
                                "instructions: %v", pldErr)
3✔
3104

3✔
3105
                        continue
3✔
3106
                }
3107

3108
                // Extract the encryption parameters.
3109
                ephemeralKey, sharedSecret, blindingPoint, failureCode :=
434✔
3110
                        chanIterator.ExtractEncrypterParams(
434✔
3111
                                l.cfg.ExtractSharedSecret,
434✔
3112
                        )
434✔
3113
                if failureCode != lnwire.CodeNone {
435✔
3114
                        // If we're unable to process the onion blob than we
1✔
3115
                        // should send the malformed htlc error to payment
1✔
3116
                        // sender.
1✔
3117
                        l.sendMalformedHTLCError(
1✔
3118
                                add.ID, failureCode, add.OnionBlob,
1✔
3119
                                &sourceRef,
1✔
3120
                        )
1✔
3121

1✔
3122
                        l.log.Errorf("unable to decode onion "+
1✔
3123
                                "obfuscator: %v", failureCode)
1✔
3124

1✔
3125
                        continue
1✔
3126
                }
3127

3128
                // Instantiate an error encrypter based on the extracted
3129
                // encryption parameters.
3130
                obfuscator := l.cfg.CreateErrorEncrypter(
433✔
3131
                        ephemeralKey, sharedSecret,
433✔
3132
                        routeRole == hop.RouteRoleIntroduction,
433✔
3133
                        blindingPoint.IsSome(),
433✔
3134
                )
433✔
3135

433✔
3136
                fwdInfo := pld.ForwardingInfo()
433✔
3137

433✔
3138
                // Check whether the payload we've just processed uses our
433✔
3139
                // node as the introduction point (gave us a blinding key in
433✔
3140
                // the payload itself) and fail it back if we don't support
433✔
3141
                // route blinding.
433✔
3142
                if fwdInfo.NextBlinding.IsSome() &&
433✔
3143
                        l.cfg.DisallowRouteBlinding {
436✔
3144

3✔
3145
                        failure := lnwire.NewInvalidBlinding(
3✔
3146
                                fn.Some(add.OnionBlob),
3✔
3147
                        )
3✔
3148

3✔
3149
                        l.sendHTLCError(
3✔
3150
                                add, sourceRef, NewLinkError(failure),
3✔
3151
                                obfuscator, false,
3✔
3152
                        )
3✔
3153

3✔
3154
                        l.log.Error("rejected htlc that uses use as an " +
3✔
3155
                                "introduction point when we do not support " +
3✔
3156
                                "route blinding")
3✔
3157

3✔
3158
                        continue
3✔
3159
                }
3160

3161
                switch fwdInfo.NextHop {
433✔
3162
                case hop.Exit:
398✔
3163
                        err := l.processExitHop(
398✔
3164
                                add, sourceRef, obfuscator, fwdInfo,
398✔
3165
                                heightNow, pld,
398✔
3166
                        )
398✔
3167
                        if err != nil {
398✔
3168
                                l.failf(LinkFailureError{
×
3169
                                        code: ErrInternalError,
×
3170
                                }, err.Error()) //nolint
×
3171

×
3172
                                return
×
3173
                        }
×
3174

3175
                // There are additional channels left within this route. So
3176
                // we'll simply do some forwarding package book-keeping.
3177
                default:
38✔
3178
                        // If hodl.AddIncoming is requested, we will not
38✔
3179
                        // validate the forwarded ADD, nor will we send the
38✔
3180
                        // packet to the htlc switch.
38✔
3181
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
38✔
3182
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3183
                                continue
×
3184
                        }
3185

3186
                        endorseValue := l.experimentalEndorsement(
38✔
3187
                                record.CustomSet(add.CustomRecords),
38✔
3188
                        )
38✔
3189
                        endorseType := uint64(
38✔
3190
                                lnwire.ExperimentalEndorsementType,
38✔
3191
                        )
38✔
3192

38✔
3193
                        switch fwdPkg.State {
38✔
3194
                        case channeldb.FwdStateProcessed:
3✔
3195
                                // This add was not forwarded on the previous
3✔
3196
                                // processing phase, run it through our
3✔
3197
                                // validation pipeline to reproduce an error.
3✔
3198
                                // This may trigger a different error due to
3✔
3199
                                // expiring timelocks, but we expect that an
3✔
3200
                                // error will be reproduced.
3✔
3201
                                if !fwdPkg.FwdFilter.Contains(idx) {
3✔
3202
                                        break
×
3203
                                }
3204

3205
                                // Otherwise, it was already processed, we can
3206
                                // can collect it and continue.
3207
                                outgoingAdd := &lnwire.UpdateAddHTLC{
3✔
3208
                                        Expiry:        fwdInfo.OutgoingCTLV,
3✔
3209
                                        Amount:        fwdInfo.AmountToForward,
3✔
3210
                                        PaymentHash:   add.PaymentHash,
3✔
3211
                                        BlindingPoint: fwdInfo.NextBlinding,
3✔
3212
                                }
3✔
3213

3✔
3214
                                endorseValue.WhenSome(func(e byte) {
6✔
3215
                                        custRecords := map[uint64][]byte{
3✔
3216
                                                endorseType: {e},
3✔
3217
                                        }
3✔
3218

3✔
3219
                                        outgoingAdd.CustomRecords = custRecords
3✔
3220
                                })
3✔
3221

3222
                                // Finally, we'll encode the onion packet for
3223
                                // the _next_ hop using the hop iterator
3224
                                // decoded for the current hop.
3225
                                buf := bytes.NewBuffer(
3✔
3226
                                        outgoingAdd.OnionBlob[0:0],
3✔
3227
                                )
3✔
3228

3✔
3229
                                // We know this cannot fail, as this ADD
3✔
3230
                                // was marked forwarded in a previous
3✔
3231
                                // round of processing.
3✔
3232
                                chanIterator.EncodeNextHop(buf)
3✔
3233

3✔
3234
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
3✔
3235

3✔
3236
                                //nolint:ll
3✔
3237
                                updatePacket := &htlcPacket{
3✔
3238
                                        incomingChanID:       l.ShortChanID(),
3✔
3239
                                        incomingHTLCID:       add.ID,
3✔
3240
                                        outgoingChanID:       fwdInfo.NextHop,
3✔
3241
                                        sourceRef:            &sourceRef,
3✔
3242
                                        incomingAmount:       add.Amount,
3✔
3243
                                        amount:               outgoingAdd.Amount,
3✔
3244
                                        htlc:                 outgoingAdd,
3✔
3245
                                        obfuscator:           obfuscator,
3✔
3246
                                        incomingTimeout:      add.Expiry,
3✔
3247
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
3✔
3248
                                        inOnionCustomRecords: pld.CustomRecords(),
3✔
3249
                                        inboundFee:           inboundFee,
3✔
3250
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
3✔
3251
                                }
3✔
3252
                                switchPackets = append(
3✔
3253
                                        switchPackets, updatePacket,
3✔
3254
                                )
3✔
3255

3✔
3256
                                continue
3✔
3257
                        }
3258

3259
                        // TODO(roasbeef): ensure don't accept outrageous
3260
                        // timeout for htlc
3261

3262
                        // With all our forwarding constraints met, we'll
3263
                        // create the outgoing HTLC using the parameters as
3264
                        // specified in the forwarding info.
3265
                        addMsg := &lnwire.UpdateAddHTLC{
38✔
3266
                                Expiry:        fwdInfo.OutgoingCTLV,
38✔
3267
                                Amount:        fwdInfo.AmountToForward,
38✔
3268
                                PaymentHash:   add.PaymentHash,
38✔
3269
                                BlindingPoint: fwdInfo.NextBlinding,
38✔
3270
                        }
38✔
3271

38✔
3272
                        endorseValue.WhenSome(func(e byte) {
76✔
3273
                                addMsg.CustomRecords = map[uint64][]byte{
38✔
3274
                                        endorseType: {e},
38✔
3275
                                }
38✔
3276
                        })
38✔
3277

3278
                        // Finally, we'll encode the onion packet for the
3279
                        // _next_ hop using the hop iterator decoded for the
3280
                        // current hop.
3281
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
38✔
3282
                        err := chanIterator.EncodeNextHop(buf)
38✔
3283
                        if err != nil {
38✔
3284
                                l.log.Errorf("unable to encode the "+
×
3285
                                        "remaining route %v", err)
×
3286

×
3287
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
×
3288
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
3289
                                }
×
3290

3291
                                failure := l.createFailureWithUpdate(
×
3292
                                        true, hop.Source, cb,
×
3293
                                )
×
3294

×
3295
                                l.sendHTLCError(
×
3296
                                        add, sourceRef, NewLinkError(failure),
×
3297
                                        obfuscator, false,
×
3298
                                )
×
3299
                                continue
×
3300
                        }
3301

3302
                        // Now that this add has been reprocessed, only append
3303
                        // it to our list of packets to forward to the switch
3304
                        // this is the first time processing the add. If the
3305
                        // fwd pkg has already been processed, then we entered
3306
                        // the above section to recreate a previous error.  If
3307
                        // the packet had previously been forwarded, it would
3308
                        // have been added to switchPackets at the top of this
3309
                        // section.
3310
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
76✔
3311
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
38✔
3312

38✔
3313
                                //nolint:ll
38✔
3314
                                updatePacket := &htlcPacket{
38✔
3315
                                        incomingChanID:       l.ShortChanID(),
38✔
3316
                                        incomingHTLCID:       add.ID,
38✔
3317
                                        outgoingChanID:       fwdInfo.NextHop,
38✔
3318
                                        sourceRef:            &sourceRef,
38✔
3319
                                        incomingAmount:       add.Amount,
38✔
3320
                                        amount:               addMsg.Amount,
38✔
3321
                                        htlc:                 addMsg,
38✔
3322
                                        obfuscator:           obfuscator,
38✔
3323
                                        incomingTimeout:      add.Expiry,
38✔
3324
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
38✔
3325
                                        inOnionCustomRecords: pld.CustomRecords(),
38✔
3326
                                        inboundFee:           inboundFee,
38✔
3327
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
38✔
3328
                                }
38✔
3329

38✔
3330
                                fwdPkg.FwdFilter.Set(idx)
38✔
3331
                                switchPackets = append(switchPackets,
38✔
3332
                                        updatePacket)
38✔
3333
                        }
38✔
3334
                }
3335
        }
3336

3337
        // Commit the htlcs we are intending to forward if this package has not
3338
        // been fully processed.
3339
        if fwdPkg.State == channeldb.FwdStateLockedIn {
2,315✔
3340
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
1,156✔
3341
                if err != nil {
1,156✔
3342
                        l.failf(LinkFailureError{code: ErrInternalError},
×
3343
                                "unable to set fwd filter: %v", err)
×
3344
                        return
×
3345
                }
×
3346
        }
3347

3348
        if len(switchPackets) == 0 {
2,283✔
3349
                return
1,124✔
3350
        }
1,124✔
3351

3352
        l.log.Debugf("forwarding %d packets to switch: reforward=%v",
38✔
3353
                len(switchPackets), reforward)
38✔
3354

38✔
3355
        // NOTE: This call is made synchronous so that we ensure all circuits
38✔
3356
        // are committed in the exact order that they are processed in the link.
38✔
3357
        // Failing to do this could cause reorderings/gaps in the range of
38✔
3358
        // opened circuits, which violates assumptions made by the circuit
38✔
3359
        // trimming.
38✔
3360
        l.forwardBatch(reforward, switchPackets...)
38✔
3361
}
3362

3363
// experimentalEndorsement returns the value to set for our outgoing
3364
// experimental endorsement field, and a boolean indicating whether it should
3365
// be populated on the outgoing htlc.
3366
func (l *channelLink) experimentalEndorsement(
3367
        customUpdateAdd record.CustomSet) fn.Option[byte] {
38✔
3368

38✔
3369
        // Only relay experimental signal if we are within the experiment
38✔
3370
        // period.
38✔
3371
        if !l.cfg.ShouldFwdExpEndorsement() {
41✔
3372
                return fn.None[byte]()
3✔
3373
        }
3✔
3374

3375
        // If we don't have any custom records or the experimental field is
3376
        // not set, just forward a zero value.
3377
        if len(customUpdateAdd) == 0 {
76✔
3378
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
38✔
3379
        }
38✔
3380

3381
        t := uint64(lnwire.ExperimentalEndorsementType)
3✔
3382
        value, set := customUpdateAdd[t]
3✔
3383
        if !set {
3✔
3384
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
3385
        }
×
3386

3387
        // We expect at least one byte for this field, consider it invalid if
3388
        // it has no data and just forward a zero value.
3389
        if len(value) == 0 {
3✔
3390
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
3391
        }
×
3392

3393
        // Only forward endorsed if the incoming link is endorsed.
3394
        if value[0] == lnwire.ExperimentalEndorsed {
6✔
3395
                return fn.Some[byte](lnwire.ExperimentalEndorsed)
3✔
3396
        }
3✔
3397

3398
        // Forward as unendorsed otherwise, including cases where we've
3399
        // received an invalid value that uses more than 3 bits of information.
3400
        return fn.Some[byte](lnwire.ExperimentalUnendorsed)
3✔
3401
}
3402

3403
// processExitHop handles an htlc for which this link is the exit hop. It
3404
// returns a boolean indicating whether the commitment tx needs an update.
3405
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
3406
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
3407
        fwdInfo hop.ForwardingInfo, heightNow uint32,
3408
        payload invoices.Payload) error {
398✔
3409

398✔
3410
        // If hodl.ExitSettle is requested, we will not validate the final hop's
398✔
3411
        // ADD, nor will we settle the corresponding invoice or respond with the
398✔
3412
        // preimage.
398✔
3413
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
493✔
3414
                l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
95✔
3415
                        hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
95✔
3416

95✔
3417
                return nil
95✔
3418
        }
95✔
3419

3420
        // In case the traffic shaper is active, we'll check if the HTLC has
3421
        // custom records and skip the amount check in the onion payload below.
3422
        isCustomHTLC := fn.MapOptionZ(
306✔
3423
                l.cfg.AuxTrafficShaper,
306✔
3424
                func(ts AuxTrafficShaper) bool {
306✔
3425
                        return ts.IsCustomHTLC(add.CustomRecords)
×
3426
                },
×
3427
        )
3428

3429
        // As we're the exit hop, we'll double check the hop-payload included in
3430
        // the HTLC to ensure that it was crafted correctly by the sender and
3431
        // is compatible with the HTLC we were extended. If an external
3432
        // validator is active we might bypass the amount check.
3433
        if !isCustomHTLC && add.Amount < fwdInfo.AmountToForward {
406✔
3434
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
100✔
3435
                        "incompatible value: expected <=%v, got %v",
100✔
3436
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
100✔
3437

100✔
3438
                failure := NewLinkError(
100✔
3439
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
100✔
3440
                )
100✔
3441
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
100✔
3442

100✔
3443
                return nil
100✔
3444
        }
100✔
3445

3446
        // We'll also ensure that our time-lock value has been computed
3447
        // correctly.
3448
        if add.Expiry < fwdInfo.OutgoingCTLV {
207✔
3449
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
1✔
3450
                        "incompatible time-lock: expected <=%v, got %v",
1✔
3451
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
1✔
3452

1✔
3453
                failure := NewLinkError(
1✔
3454
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
1✔
3455
                )
1✔
3456

1✔
3457
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
1✔
3458

1✔
3459
                return nil
1✔
3460
        }
1✔
3461

3462
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
3463
        // after this, this code will be re-executed after restart. We will
3464
        // receive back a resolution event.
3465
        invoiceHash := lntypes.Hash(add.PaymentHash)
205✔
3466

205✔
3467
        circuitKey := models.CircuitKey{
205✔
3468
                ChanID: l.ShortChanID(),
205✔
3469
                HtlcID: add.ID,
205✔
3470
        }
205✔
3471

205✔
3472
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
205✔
3473
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
205✔
3474
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
205✔
3475
        )
205✔
3476
        if err != nil {
205✔
3477
                return err
×
3478
        }
×
3479

3480
        // Create a hodlHtlc struct and decide either resolved now or later.
3481
        htlc := hodlHtlc{
205✔
3482
                add:        add,
205✔
3483
                sourceRef:  sourceRef,
205✔
3484
                obfuscator: obfuscator,
205✔
3485
        }
205✔
3486

205✔
3487
        // If the event is nil, the invoice is being held, so we save payment
205✔
3488
        // descriptor for future reference.
205✔
3489
        if event == nil {
264✔
3490
                l.hodlMap[circuitKey] = htlc
59✔
3491
                return nil
59✔
3492
        }
59✔
3493

3494
        // Process the received resolution.
3495
        return l.processHtlcResolution(event, htlc)
149✔
3496
}
3497

3498
// settleHTLC settles the HTLC on the channel.
3499
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
3500
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
200✔
3501

200✔
3502
        hash := preimage.Hash()
200✔
3503

200✔
3504
        l.log.Infof("settling htlc %v as exit hop", hash)
200✔
3505

200✔
3506
        err := l.channel.SettleHTLC(
200✔
3507
                preimage, htlcIndex, &sourceRef, nil, nil,
200✔
3508
        )
200✔
3509
        if err != nil {
200✔
3510
                return fmt.Errorf("unable to settle htlc: %w", err)
×
3511
        }
×
3512

3513
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
3514
        // fake one before sending it to the peer.
3515
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
203✔
3516
                l.log.Warnf(hodl.BogusSettle.Warning())
3✔
3517
                preimage = [32]byte{}
3✔
3518
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
3✔
3519
        }
3✔
3520

3521
        // HTLC was successfully settled locally send notification about it
3522
        // remote peer.
3523
        err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
200✔
3524
                ChanID:          l.ChanID(),
200✔
3525
                ID:              htlcIndex,
200✔
3526
                PaymentPreimage: preimage,
200✔
3527
        })
200✔
3528
        if err != nil {
200✔
3529
                l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
×
3530
        }
×
3531

3532
        // Once we have successfully settled the htlc, notify a settle event.
3533
        l.cfg.HtlcNotifier.NotifySettleEvent(
200✔
3534
                HtlcKey{
200✔
3535
                        IncomingCircuit: models.CircuitKey{
200✔
3536
                                ChanID: l.ShortChanID(),
200✔
3537
                                HtlcID: htlcIndex,
200✔
3538
                        },
200✔
3539
                },
200✔
3540
                preimage,
200✔
3541
                HtlcEventTypeReceive,
200✔
3542
        )
200✔
3543

200✔
3544
        return nil
200✔
3545
}
3546

3547
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
3548
// err chan for the individual responses. This method is intended to be spawned
3549
// as a goroutine so the responses can be handled in the background.
3550
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
577✔
3551
        // Don't forward packets for which we already have a response in our
577✔
3552
        // mailbox. This could happen if a packet fails and is buffered in the
577✔
3553
        // mailbox, and the incoming link flaps.
577✔
3554
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
577✔
3555
        for _, pkt := range packets {
1,154✔
3556
                if l.mailBox.HasPacket(pkt.inKey()) {
580✔
3557
                        continue
3✔
3558
                }
3559

3560
                filteredPkts = append(filteredPkts, pkt)
577✔
3561
        }
3562

3563
        err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...)
577✔
3564
        if err != nil {
588✔
3565
                log.Errorf("Unhandled error while reforwarding htlc "+
11✔
3566
                        "settle/fail over htlcswitch: %v", err)
11✔
3567
        }
11✔
3568
}
3569

3570
// sendHTLCError functions cancels HTLC and send cancel message back to the
3571
// peer from which HTLC was received.
3572
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
3573
        sourceRef channeldb.AddRef, failure *LinkError,
3574
        e hop.ErrorEncrypter, isReceive bool) {
108✔
3575

108✔
3576
        reason, attrData, err := e.EncryptFirstHop(failure.WireMessage())
108✔
3577
        if err != nil {
108✔
3578
                l.log.Errorf("unable to obfuscate error: %v", err)
×
3579
                return
×
3580
        }
×
3581

3582
        extraData, err := lnwire.AttrDataToExtraData(attrData)
108✔
3583
        if err != nil {
108✔
NEW
3584
                return
×
NEW
3585
        }
×
3586

3587
        err = l.channel.FailHTLC(
108✔
3588
                add.ID, reason, extraData, &sourceRef, nil, nil,
108✔
3589
        )
108✔
3590
        if err != nil {
108✔
3591
                l.log.Errorf("unable cancel htlc: %v", err)
×
3592
                return
×
3593
        }
×
3594

3595
        // Send the appropriate failure message depending on whether we're
3596
        // in a blinded route or not.
3597
        if err := l.sendIncomingHTLCFailureMsg(
108✔
3598
                add.ID, e, reason, extraData,
108✔
3599
        ); err != nil {
108✔
3600
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
3601
                return
×
3602
        }
×
3603

3604
        // Notify a link failure on our incoming link. Outgoing htlc information
3605
        // is not available at this point, because we have not decrypted the
3606
        // onion, so it is excluded.
3607
        var eventType HtlcEventType
108✔
3608
        if isReceive {
216✔
3609
                eventType = HtlcEventTypeReceive
108✔
3610
        } else {
111✔
3611
                eventType = HtlcEventTypeForward
3✔
3612
        }
3✔
3613

3614
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
108✔
3615
                HtlcKey{
108✔
3616
                        IncomingCircuit: models.CircuitKey{
108✔
3617
                                ChanID: l.ShortChanID(),
108✔
3618
                                HtlcID: add.ID,
108✔
3619
                        },
108✔
3620
                },
108✔
3621
                HtlcInfo{
108✔
3622
                        IncomingTimeLock: add.Expiry,
108✔
3623
                        IncomingAmt:      add.Amount,
108✔
3624
                },
108✔
3625
                eventType,
108✔
3626
                failure,
108✔
3627
                true,
108✔
3628
        )
108✔
3629
}
3630

3631
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
3632
// peer from which the HTLC was received. This function is primarily used to
3633
// handle the special requirements of route blinding, specifically:
3634
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
3635
// - Introduction nodes should return regular HTLC failure messages.
3636
//
3637
// It accepts the original opaque failure, which will be used in the case
3638
// that we're not part of a blinded route and an error encrypter that'll be
3639
// used if we are the introduction node and need to present an error as if
3640
// we're the failing party.
3641
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
3642
        e hop.ErrorEncrypter, originalFailure lnwire.OpaqueReason,
3643
        extraData lnwire.ExtraOpaqueData) error {
123✔
3644

123✔
3645
        var msg lnwire.Message
123✔
3646
        switch {
123✔
3647
        // Our circuit's error encrypter will be nil if this was a locally
3648
        // initiated payment. We can only hit a blinded error for a locally
3649
        // initiated payment if we allow ourselves to be picked as the
3650
        // introduction node for our own payments and in that case we
3651
        // shouldn't reach this code. To prevent the HTLC getting stuck,
3652
        // we fail it back and log an error.
3653
        // code.
3654
        case e == nil:
×
3655
                msg = &lnwire.UpdateFailHTLC{
×
NEW
3656
                        ChanID:    l.ChanID(),
×
NEW
3657
                        ID:        htlcIndex,
×
NEW
3658
                        Reason:    originalFailure,
×
NEW
3659
                        ExtraData: extraData,
×
3660
                }
×
3661

×
3662
                l.log.Errorf("Unexpected blinded failure when "+
×
3663
                        "we are the sending node, incoming htlc: %v(%v)",
×
3664
                        l.ShortChanID(), htlcIndex)
×
3665

3666
        // For cleartext hops (ie, non-blinded/normal) we don't need any
3667
        // transformation on the error message and can just send the original.
3668
        case !e.Type().IsBlinded():
123✔
3669
                msg = &lnwire.UpdateFailHTLC{
123✔
3670
                        ChanID:    l.ChanID(),
123✔
3671
                        ID:        htlcIndex,
123✔
3672
                        Reason:    originalFailure,
123✔
3673
                        ExtraData: extraData,
123✔
3674
                }
123✔
3675

3676
        // When we're the introduction node, we need to convert the error to
3677
        // a UpdateFailHTLC.
3678
        case e.Type() == hop.EncrypterTypeIntroduction:
3✔
3679
                l.log.Debugf("Introduction blinded node switching out failure "+
3✔
3680
                        "error: %v", htlcIndex)
3✔
3681

3✔
3682
                // The specification does not require that we set the onion
3✔
3683
                // blob.
3✔
3684
                failureMsg := lnwire.NewInvalidBlinding(
3✔
3685
                        fn.None[[lnwire.OnionPacketSize]byte](),
3✔
3686
                )
3✔
3687
                reason, _, err := e.EncryptFirstHop(failureMsg)
3✔
3688
                if err != nil {
3✔
3689
                        return err
×
3690
                }
×
3691

3692
                msg = &lnwire.UpdateFailHTLC{
3✔
3693
                        ChanID: l.ChanID(),
3✔
3694
                        ID:     htlcIndex,
3✔
3695
                        Reason: reason,
3✔
3696
                }
3✔
3697

3698
        // If we are a relaying node, we need to switch out any error that
3699
        // we've received to a malformed HTLC error.
3700
        case e.Type() == hop.EncrypterTypeRelaying:
3✔
3701
                l.log.Debugf("Relaying blinded node switching out malformed "+
3✔
3702
                        "error: %v", htlcIndex)
3✔
3703

3✔
3704
                msg = &lnwire.UpdateFailMalformedHTLC{
3✔
3705
                        ChanID:      l.ChanID(),
3✔
3706
                        ID:          htlcIndex,
3✔
3707
                        FailureCode: lnwire.CodeInvalidBlinding,
3✔
3708
                }
3✔
3709

3710
        default:
×
3711
                return fmt.Errorf("unexpected encrypter: %d", e)
×
3712
        }
3713

3714
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
123✔
3715
                l.log.Warnf("Send update fail failed: %v", err)
×
3716
        }
×
3717

3718
        return nil
123✔
3719
}
3720

3721
// sendMalformedHTLCError helper function which sends the malformed HTLC update
3722
// to the payment sender.
3723
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
3724
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
3725
        sourceRef *channeldb.AddRef) {
6✔
3726

6✔
3727
        shaOnionBlob := sha256.Sum256(onionBlob[:])
6✔
3728
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
6✔
3729
        if err != nil {
6✔
3730
                l.log.Errorf("unable cancel htlc: %v", err)
×
3731
                return
×
3732
        }
×
3733

3734
        err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
6✔
3735
                ChanID:       l.ChanID(),
6✔
3736
                ID:           htlcIndex,
6✔
3737
                ShaOnionBlob: shaOnionBlob,
6✔
3738
                FailureCode:  code,
6✔
3739
        })
6✔
3740
        if err != nil {
6✔
3741
                l.log.Errorf("failed to send UpdateFailMalformedHTLC: %v", err)
×
3742
        }
×
3743
}
3744

3745
// failf is a function which is used to encapsulate the action necessary for
3746
// properly failing the link. It takes a LinkFailureError, which will be passed
3747
// to the OnChannelFailure closure, in order for it to determine if we should
3748
// force close the channel, and if we should send an error message to the
3749
// remote peer.
3750
func (l *channelLink) failf(linkErr LinkFailureError, format string,
3751
        a ...interface{}) {
18✔
3752

18✔
3753
        reason := fmt.Errorf(format, a...)
18✔
3754

18✔
3755
        // Return if we have already notified about a failure.
18✔
3756
        if l.failed {
21✔
3757
                l.log.Warnf("ignoring link failure (%v), as link already "+
3✔
3758
                        "failed", reason)
3✔
3759
                return
3✔
3760
        }
3✔
3761

3762
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
18✔
3763

18✔
3764
        // Set failed, such that we won't process any more updates, and notify
18✔
3765
        // the peer about the failure.
18✔
3766
        l.failed = true
18✔
3767
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
18✔
3768
}
3769

3770
// FundingCustomBlob returns the custom funding blob of the channel that this
3771
// link is associated with. The funding blob represents static information about
3772
// the channel that was created at channel funding time.
3773
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
3774
        if l.channel == nil {
×
3775
                return fn.None[tlv.Blob]()
×
3776
        }
×
3777

3778
        if l.channel.State() == nil {
×
3779
                return fn.None[tlv.Blob]()
×
3780
        }
×
3781

3782
        return l.channel.State().CustomBlob
×
3783
}
3784

3785
// CommitmentCustomBlob returns the custom blob of the current local commitment
3786
// of the channel that this link is associated with.
3787
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
3788
        if l.channel == nil {
×
3789
                return fn.None[tlv.Blob]()
×
3790
        }
×
3791

3792
        return l.channel.LocalCommitmentBlob()
×
3793
}
3794

3795
// handleHtlcResolution takes an HTLC resolution and processes it by draining
3796
// the hodlQueue. Once processed, a commit_sig is sent to the remote to update
3797
// their commitment.
3798
func (l *channelLink) handleHtlcResolution(ctx context.Context,
3799
        hodlItem any) error {
58✔
3800

58✔
3801
        htlcResolution, ok := hodlItem.(invoices.HtlcResolution)
58✔
3802
        if !ok {
58✔
3803
                return fmt.Errorf("expect HtlcResolution, got %T", hodlItem)
×
3804
        }
×
3805

3806
        err := l.processHodlQueue(ctx, htlcResolution)
58✔
3807
        // No error, success.
58✔
3808
        if err == nil {
115✔
3809
                return nil
57✔
3810
        }
57✔
3811

3812
        switch {
1✔
3813
        // If the duplicate keystone error was encountered, fail back
3814
        // gracefully.
3815
        case errors.Is(err, ErrDuplicateKeystone):
×
3816
                l.failf(
×
3817
                        LinkFailureError{
×
3818
                                code: ErrCircuitError,
×
3819
                        },
×
3820
                        "process hodl queue: temporary circuit error: %v", err,
×
3821
                )
×
3822

3823
        // Send an Error message to the peer.
3824
        default:
1✔
3825
                l.failf(
1✔
3826
                        LinkFailureError{
1✔
3827
                                code: ErrInternalError,
1✔
3828
                        },
1✔
3829
                        "process hodl queue: unable to update commitment: %v",
1✔
3830
                        err,
1✔
3831
                )
1✔
3832
        }
3833

3834
        return err
1✔
3835
}
3836

3837
// handleQuiescenceReq takes a locally initialized (RPC) quiescence request and
3838
// forwards it to the quiescer for further processing.
3839
func (l *channelLink) handleQuiescenceReq(req StfuReq) error {
4✔
3840
        l.quiescer.InitStfu(req)
4✔
3841

4✔
3842
        if !l.noDanglingUpdates(lntypes.Local) {
4✔
3843
                return nil
×
3844
        }
×
3845

3846
        err := l.quiescer.SendOwedStfu()
4✔
3847
        if err != nil {
4✔
3848
                l.stfuFailf("SendOwedStfu: %s", err.Error())
×
3849
                res := fn.Err[lntypes.ChannelParty](err)
×
3850
                req.Resolve(res)
×
3851
        }
×
3852

3853
        return err
4✔
3854
}
3855

3856
// handleUpdateFee is called whenever the `updateFeeTimer` ticks. It is used to
3857
// decide whether we should send an `update_fee` msg to update the commitment's
3858
// feerate.
3859
func (l *channelLink) handleUpdateFee(ctx context.Context) error {
4✔
3860
        // If we're not the initiator of the channel, we don't control the fees,
4✔
3861
        // so we can ignore this.
4✔
3862
        if !l.channel.IsInitiator() {
4✔
3863
                return nil
×
3864
        }
×
3865

3866
        // If we are the initiator, then we'll sample the current fee rate to
3867
        // get into the chain within 3 blocks.
3868
        netFee, err := l.sampleNetworkFee()
4✔
3869
        if err != nil {
4✔
3870
                return fmt.Errorf("unable to sample network fee: %w", err)
×
3871
        }
×
3872

3873
        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
4✔
3874

4✔
3875
        newCommitFee := l.channel.IdealCommitFeeRate(
4✔
3876
                netFee, minRelayFee,
4✔
3877
                l.cfg.MaxAnchorsCommitFeeRate,
4✔
3878
                l.cfg.MaxFeeAllocation,
4✔
3879
        )
4✔
3880

4✔
3881
        // We determine if we should adjust the commitment fee based on the
4✔
3882
        // current commitment fee, the suggested new commitment fee and the
4✔
3883
        // current minimum relay fee rate.
4✔
3884
        commitFee := l.channel.CommitFeeRate()
4✔
3885
        if !shouldAdjustCommitFee(newCommitFee, commitFee, minRelayFee) {
5✔
3886
                return nil
1✔
3887
        }
1✔
3888

3889
        // If we do, then we'll send a new UpdateFee message to the remote
3890
        // party, to be locked in with a new update.
3891
        err = l.updateChannelFee(ctx, newCommitFee)
3✔
3892
        if err != nil {
3✔
3893
                return fmt.Errorf("unable to update fee rate: %w", err)
×
3894
        }
×
3895

3896
        return nil
3✔
3897
}
3898

3899
// toggleBatchTicker checks whether we need to resume or pause the batch ticker.
3900
// When we have no pending updates, the ticker is paused, otherwise resumed.
3901
func (l *channelLink) toggleBatchTicker() {
4,101✔
3902
        // If the previous event resulted in a non-empty batch, resume the batch
4,101✔
3903
        // ticker so that it can be cleared. Otherwise pause the ticker to
4,101✔
3904
        // prevent waking up the htlcManager while the batch is empty.
4,101✔
3905
        numUpdates := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
4,101✔
3906
        if numUpdates > 0 {
4,613✔
3907
                l.cfg.BatchTicker.Resume()
512✔
3908
                l.log.Tracef("BatchTicker resumed, NumPendingUpdates(Local, "+
512✔
3909
                        "Remote)=%d", numUpdates)
512✔
3910

512✔
3911
                return
512✔
3912
        }
512✔
3913

3914
        l.cfg.BatchTicker.Pause()
3,592✔
3915
        l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" +
3,592✔
3916
                "(Local, Remote)")
3,592✔
3917
}
3918

3919
// resumeLink is called when starting a previous link. It will go through the
3920
// reestablishment protocol and reforwarding packets that are yet resolved.
3921
func (l *channelLink) resumeLink(ctx context.Context) error {
216✔
3922
        // If this isn't the first time that this channel link has been created,
216✔
3923
        // then we'll need to check to see if we need to re-synchronize state
216✔
3924
        // with the remote peer. settledHtlcs is a map of HTLC's that we
216✔
3925
        // re-settled as part of the channel state sync.
216✔
3926
        if l.cfg.SyncStates {
389✔
3927
                err := l.syncChanStates(ctx)
173✔
3928
                if err != nil {
176✔
3929
                        l.handleChanSyncErr(err)
3✔
3930

3✔
3931
                        return err
3✔
3932
                }
3✔
3933
        }
3934

3935
        // If a shutdown message has previously been sent on this link, then we
3936
        // need to make sure that we have disabled any HTLC adds on the outgoing
3937
        // direction of the link and that we re-resend the same shutdown message
3938
        // that we previously sent.
3939
        //
3940
        // TODO(yy): we should either move this to chanCloser, or move all
3941
        // shutdown handling logic to be managed by the link, but not a mixed of
3942
        // partial management by two subsystems.
3943
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
219✔
3944
                // Immediately disallow any new outgoing HTLCs.
3✔
3945
                if !l.DisableAdds(Outgoing) {
3✔
3946
                        l.log.Warnf("Outgoing link adds already disabled")
×
3947
                }
×
3948

3949
                // Re-send the shutdown message the peer. Since syncChanStates
3950
                // would have sent any outstanding CommitSig, it is fine for us
3951
                // to immediately queue the shutdown message now.
3952
                err := l.cfg.Peer.SendMessage(false, &shutdown)
3✔
3953
                if err != nil {
3✔
3954
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
3955
                }
×
3956
        })
3957

3958
        // We've successfully reestablished the channel, mark it as such to
3959
        // allow the switch to forward HTLCs in the outbound direction.
3960
        l.markReestablished()
216✔
3961

216✔
3962
        // With the channel states synced, we now reset the mailbox to ensure we
216✔
3963
        // start processing all unacked packets in order. This is done here to
216✔
3964
        // ensure that all acknowledgments that occur during channel
216✔
3965
        // resynchronization have taken affect, causing us only to pull unacked
216✔
3966
        // packets after starting to read from the downstream mailbox.
216✔
3967
        err := l.mailBox.ResetPackets()
216✔
3968
        if err != nil {
216✔
3969
                l.log.Errorf("failed to reset packets: %v", err)
×
3970
        }
×
3971

3972
        // If the channel is pending, there's no need to reforwarding packets.
3973
        if l.ShortChanID() == hop.Source {
216✔
3974
                return nil
×
3975
        }
×
3976

3977
        // After cleaning up any memory pertaining to incoming packets, we now
3978
        // replay our forwarding packages to handle any htlcs that can be
3979
        // processed locally, or need to be forwarded out to the switch. We will
3980
        // only attempt to resolve packages if our short chan id indicates that
3981
        // the channel is not pending, otherwise we should have no htlcs to
3982
        // reforward.
3983
        err = l.resolveFwdPkgs(ctx)
216✔
3984
        switch {
216✔
3985
        // No error was encountered, success.
3986
        case err == nil:
216✔
3987
                // With our link's in-memory state fully reconstructed, spawn a
216✔
3988
                // goroutine to manage the reclamation of disk space occupied by
216✔
3989
                // completed forwarding packages.
216✔
3990
                l.cg.WgAdd(1)
216✔
3991
                go l.fwdPkgGarbager()
216✔
3992

216✔
3993
                return nil
216✔
3994

3995
        // If the duplicate keystone error was encountered, we'll fail without
3996
        // sending an Error message to the peer.
3997
        case errors.Is(err, ErrDuplicateKeystone):
×
3998
                l.failf(LinkFailureError{code: ErrCircuitError},
×
3999
                        "temporary circuit error: %v", err)
×
4000

4001
        // A non-nil error was encountered, send an Error message to
4002
        // the peer.
UNCOV
4003
        default:
×
UNCOV
4004
                l.failf(LinkFailureError{code: ErrInternalError},
×
UNCOV
4005
                        "unable to resolve fwd pkgs: %v", err)
×
4006
        }
4007

UNCOV
4008
        return err
×
4009
}
4010

4011
// processRemoteUpdateAddHTLC takes an `UpdateAddHTLC` msg sent from the remote
4012
// and processes it.
4013
func (l *channelLink) processRemoteUpdateAddHTLC(
4014
        msg *lnwire.UpdateAddHTLC) error {
451✔
4015

451✔
4016
        if l.IsFlushing(Incoming) {
451✔
4017
                // This is forbidden by the protocol specification. The best
×
4018
                // chance we have to deal with this is to drop the connection.
×
4019
                // This should roll back the channel state to the last
×
4020
                // CommitSig. If the remote has already sent a CommitSig we
×
4021
                // haven't received yet, channel state will be re-synchronized
×
4022
                // with a ChannelReestablish message upon reconnection and the
×
4023
                // protocol state that caused us to flush the link will be
×
4024
                // rolled back. In the event that there was some
×
4025
                // non-deterministic behavior in the remote that caused them to
×
4026
                // violate the protocol, we have a decent shot at correcting it
×
4027
                // this way, since reconnecting will put us in the cleanest
×
4028
                // possible state to try again.
×
4029
                //
×
4030
                // In addition to the above, it is possible for us to hit this
×
4031
                // case in situations where we improperly handle message
×
4032
                // ordering due to concurrency choices. An issue has been filed
×
4033
                // to address this here:
×
4034
                // https://github.com/lightningnetwork/lnd/issues/8393
×
4035
                err := errors.New("received add while link is flushing")
×
4036
                l.failf(
×
4037
                        LinkFailureError{
×
4038
                                code:             ErrInvalidUpdate,
×
4039
                                FailureAction:    LinkFailureDisconnect,
×
4040
                                PermanentFailure: false,
×
4041
                                Warning:          true,
×
4042
                        }, err.Error(),
×
4043
                )
×
4044

×
4045
                return err
×
4046
        }
×
4047

4048
        // Disallow htlcs with blinding points set if we haven't enabled the
4049
        // feature. This saves us from having to process the onion at all, but
4050
        // will only catch blinded payments where we are a relaying node (as the
4051
        // blinding point will be in the payload when we're the introduction
4052
        // node).
4053
        if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
451✔
4054
                err := errors.New("blinding point included when route " +
×
4055
                        "blinding is disabled")
×
4056

×
4057
                l.failf(LinkFailureError{code: ErrInvalidUpdate}, err.Error())
×
4058

×
4059
                return err
×
4060
        }
×
4061

4062
        // We have to check the limit here rather than later in the switch
4063
        // because the counterparty can keep sending HTLC's without sending a
4064
        // revoke. This would mean that the switch check would only occur later.
4065
        if l.isOverexposedWithHtlc(msg, true) {
451✔
4066
                err := errors.New("peer sent us an HTLC that exceeded our " +
×
4067
                        "max fee exposure")
×
4068
                l.failf(LinkFailureError{code: ErrInternalError}, err.Error())
×
4069

×
4070
                return err
×
4071
        }
×
4072

4073
        // We just received an add request from an upstream peer, so we add it
4074
        // to our state machine, then add the HTLC to our "settle" list in the
4075
        // event that we know the preimage.
4076
        index, err := l.channel.ReceiveHTLC(msg)
451✔
4077
        if err != nil {
451✔
4078
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4079
                        "unable to handle upstream add HTLC: %v", err)
×
4080

×
4081
                return err
×
4082
        }
×
4083

4084
        l.log.Tracef("receive upstream htlc with payment hash(%x), "+
451✔
4085
                "assigning index: %v", msg.PaymentHash[:], index)
451✔
4086

451✔
4087
        return nil
451✔
4088
}
4089

4090
// processRemoteUpdateFulfillHTLC takes an `UpdateFulfillHTLC` msg sent from the
4091
// remote and processes it.
4092
func (l *channelLink) processRemoteUpdateFulfillHTLC(
4093
        msg *lnwire.UpdateFulfillHTLC) error {
230✔
4094

230✔
4095
        pre := msg.PaymentPreimage
230✔
4096
        idx := msg.ID
230✔
4097

230✔
4098
        // Before we pipeline the settle, we'll check the set of active htlc's
230✔
4099
        // to see if the related UpdateAddHTLC has been fully locked-in.
230✔
4100
        var lockedin bool
230✔
4101
        htlcs := l.channel.ActiveHtlcs()
230✔
4102
        for _, add := range htlcs {
763✔
4103
                // The HTLC will be outgoing and match idx.
533✔
4104
                if !add.Incoming && add.HtlcIndex == idx {
761✔
4105
                        lockedin = true
228✔
4106
                        break
228✔
4107
                }
4108
        }
4109

4110
        if !lockedin {
232✔
4111
                err := errors.New("unable to handle upstream settle")
2✔
4112
                l.failf(LinkFailureError{code: ErrInvalidUpdate}, err.Error())
2✔
4113

2✔
4114
                return err
2✔
4115
        }
2✔
4116

4117
        if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
231✔
4118
                l.failf(
3✔
4119
                        LinkFailureError{
3✔
4120
                                code:          ErrInvalidUpdate,
3✔
4121
                                FailureAction: LinkFailureForceClose,
3✔
4122
                        },
3✔
4123
                        "unable to handle upstream settle HTLC: %v", err,
3✔
4124
                )
3✔
4125

3✔
4126
                return err
3✔
4127
        }
3✔
4128

4129
        settlePacket := &htlcPacket{
228✔
4130
                outgoingChanID: l.ShortChanID(),
228✔
4131
                outgoingHTLCID: idx,
228✔
4132
                htlc: &lnwire.UpdateFulfillHTLC{
228✔
4133
                        PaymentPreimage: pre,
228✔
4134
                },
228✔
4135
        }
228✔
4136

228✔
4137
        // Add the newly discovered preimage to our growing list of uncommitted
228✔
4138
        // preimage. These will be written to the witness cache just before
228✔
4139
        // accepting the next commitment signature from the remote peer.
228✔
4140
        l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
228✔
4141

228✔
4142
        // Pipeline this settle, send it to the switch.
228✔
4143
        go l.forwardBatch(false, settlePacket)
228✔
4144

228✔
4145
        return nil
228✔
4146
}
4147

4148
// processRemoteUpdateFailMalformedHTLC takes an `UpdateFailMalformedHTLC` msg
4149
// sent from the remote and processes it.
4150
func (l *channelLink) processRemoteUpdateFailMalformedHTLC(
4151
        msg *lnwire.UpdateFailMalformedHTLC) error {
6✔
4152

6✔
4153
        // Convert the failure type encoded within the HTLC fail message to the
6✔
4154
        // proper generic lnwire error code.
6✔
4155
        var failure lnwire.FailureMessage
6✔
4156
        switch msg.FailureCode {
6✔
4157
        case lnwire.CodeInvalidOnionVersion:
4✔
4158
                failure = &lnwire.FailInvalidOnionVersion{
4✔
4159
                        OnionSHA256: msg.ShaOnionBlob,
4✔
4160
                }
4✔
4161
        case lnwire.CodeInvalidOnionHmac:
×
4162
                failure = &lnwire.FailInvalidOnionHmac{
×
4163
                        OnionSHA256: msg.ShaOnionBlob,
×
4164
                }
×
4165

4166
        case lnwire.CodeInvalidOnionKey:
×
4167
                failure = &lnwire.FailInvalidOnionKey{
×
4168
                        OnionSHA256: msg.ShaOnionBlob,
×
4169
                }
×
4170

4171
        // Handle malformed errors that are part of a blinded route. This case
4172
        // is slightly different, because we expect every relaying node in the
4173
        // blinded portion of the route to send malformed errors. If we're also
4174
        // a relaying node, we're likely going to switch this error out anyway
4175
        // for our own malformed error, but we handle the case here for
4176
        // completeness.
4177
        case lnwire.CodeInvalidBlinding:
3✔
4178
                failure = &lnwire.FailInvalidBlinding{
3✔
4179
                        OnionSHA256: msg.ShaOnionBlob,
3✔
4180
                }
3✔
4181

4182
        default:
2✔
4183
                l.log.Warnf("unexpected failure code received in "+
2✔
4184
                        "UpdateFailMailformedHTLC: %v", msg.FailureCode)
2✔
4185

2✔
4186
                // We don't just pass back the error we received from our
2✔
4187
                // successor. Otherwise we might report a failure that penalizes
2✔
4188
                // us more than needed. If the onion that we forwarded was
2✔
4189
                // correct, the node should have been able to send back its own
2✔
4190
                // failure. The node did not send back its own failure, so we
2✔
4191
                // assume there was a problem with the onion and report that
2✔
4192
                // back. We reuse the invalid onion key failure because there is
2✔
4193
                // no specific error for this case.
2✔
4194
                failure = &lnwire.FailInvalidOnionKey{
2✔
4195
                        OnionSHA256: msg.ShaOnionBlob,
2✔
4196
                }
2✔
4197
        }
4198

4199
        // With the error parsed, we'll convert the into it's opaque form.
4200
        var b bytes.Buffer
6✔
4201
        if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
6✔
4202
                return fmt.Errorf("unable to encode malformed error: %w", err)
×
4203
        }
×
4204

4205
        // If remote side have been unable to parse the onion blob we have sent
4206
        // to it, than we should transform the malformed HTLC message to the
4207
        // usual HTLC fail message.
4208
        err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes(), msg.ExtraData)
6✔
4209
        if err != nil {
6✔
4210
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4211
                        "unable to handle upstream fail HTLC: %v", err)
×
4212

×
4213
                return err
×
4214
        }
×
4215

4216
        return nil
6✔
4217
}
4218

4219
// processRemoteUpdateFailHTLC takes an `UpdateFailHTLC` msg sent from the
4220
// remote and processes it.
4221
func (l *channelLink) processRemoteUpdateFailHTLC(
4222
        msg *lnwire.UpdateFailHTLC) error {
122✔
4223

122✔
4224
        // Verify that the failure reason is at least 256 bytes plus overhead.
122✔
4225
        const minimumFailReasonLength = lnwire.FailureMessageLength + 2 + 2 + 32
122✔
4226

122✔
4227
        if len(msg.Reason) < minimumFailReasonLength {
123✔
4228
                // We've received a reason with a non-compliant length. Older
1✔
4229
                // nodes happily relay back these failures that may originate
1✔
4230
                // from a node further downstream. Therefore we can't just fail
1✔
4231
                // the channel.
1✔
4232
                //
1✔
4233
                // We want to be compliant ourselves, so we also can't pass back
1✔
4234
                // the reason unmodified. And we must make sure that we don't
1✔
4235
                // hit the magic length check of 260 bytes in
1✔
4236
                // processRemoteSettleFails either.
1✔
4237
                //
1✔
4238
                // Because the reason is unreadable for the payer anyway, we
1✔
4239
                // just replace it by a compliant-length series of random bytes.
1✔
4240
                msg.Reason = make([]byte, minimumFailReasonLength)
1✔
4241
                _, err := crand.Read(msg.Reason[:])
1✔
4242
                if err != nil {
1✔
4243
                        return fmt.Errorf("random generation error: %w", err)
×
4244
                }
×
4245
        }
4246

4247
        // Add fail to the update log.
4248
        idx := msg.ID
122✔
4249
        err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:], msg.ExtraData)
122✔
4250
        if err != nil {
122✔
4251
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4252
                        "unable to handle upstream fail HTLC: %v", err)
×
4253

×
4254
                return err
×
4255
        }
×
4256

4257
        return nil
122✔
4258
}
4259

4260
// processRemoteCommitSig takes a `CommitSig` msg sent from the remote and
4261
// processes it.
4262
func (l *channelLink) processRemoteCommitSig(ctx context.Context,
4263
        msg *lnwire.CommitSig) error {
1,169✔
4264

1,169✔
4265
        // Since we may have learned new preimages for the first time, we'll add
1,169✔
4266
        // them to our preimage cache. By doing this, we ensure any contested
1,169✔
4267
        // contracts watched by any on-chain arbitrators can now sweep this HTLC
1,169✔
4268
        // on-chain. We delay committing the preimages until just before
1,169✔
4269
        // accepting the new remote commitment, as afterwards the peer won't
1,169✔
4270
        // resend the Settle messages on the next channel reestablishment. Doing
1,169✔
4271
        // so allows us to more effectively batch this operation, instead of
1,169✔
4272
        // doing a single write per preimage.
1,169✔
4273
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
1,169✔
4274
        if err != nil {
1,169✔
4275
                l.failf(
×
4276
                        LinkFailureError{code: ErrInternalError},
×
4277
                        "unable to add preimages=%v to cache: %v",
×
4278
                        l.uncommittedPreimages, err,
×
4279
                )
×
4280

×
4281
                return err
×
4282
        }
×
4283

4284
        // Instead of truncating the slice to conserve memory allocations, we
4285
        // simply set the uncommitted preimage slice to nil so that a new one
4286
        // will be initialized if any more witnesses are discovered. We do this
4287
        // because the maximum size that the slice can occupy is 15KB, and we
4288
        // want to ensure we release that memory back to the runtime.
4289
        l.uncommittedPreimages = nil
1,169✔
4290

1,169✔
4291
        // We just received a new updates to our local commitment chain,
1,169✔
4292
        // validate this new commitment, closing the link if invalid.
1,169✔
4293
        auxSigBlob, err := msg.CustomRecords.Serialize()
1,169✔
4294
        if err != nil {
1,169✔
4295
                l.failf(
×
4296
                        LinkFailureError{code: ErrInvalidCommitment},
×
4297
                        "unable to serialize custom records: %v", err,
×
4298
                )
×
4299

×
4300
                return err
×
4301
        }
×
4302
        err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
1,169✔
4303
                CommitSig:  msg.CommitSig,
1,169✔
4304
                HtlcSigs:   msg.HtlcSigs,
1,169✔
4305
                PartialSig: msg.PartialSig,
1,169✔
4306
                AuxSigBlob: auxSigBlob,
1,169✔
4307
        })
1,169✔
4308
        if err != nil {
1,169✔
4309
                // If we were unable to reconstruct their proposed commitment,
×
4310
                // then we'll examine the type of error. If it's an
×
4311
                // InvalidCommitSigError, then we'll send a direct error.
×
4312
                var sendData []byte
×
4313
                switch {
×
4314
                case lnutils.ErrorAs[*lnwallet.InvalidCommitSigError](err):
×
4315
                        sendData = []byte(err.Error())
×
4316
                case lnutils.ErrorAs[*lnwallet.InvalidHtlcSigError](err):
×
4317
                        sendData = []byte(err.Error())
×
4318
                }
4319
                l.failf(
×
4320
                        LinkFailureError{
×
4321
                                code:          ErrInvalidCommitment,
×
4322
                                FailureAction: LinkFailureForceClose,
×
4323
                                SendData:      sendData,
×
4324
                        },
×
4325
                        "ChannelPoint(%v): unable to accept new "+
×
4326
                                "commitment: %v",
×
4327
                        l.channel.ChannelPoint(), err,
×
4328
                )
×
4329

×
4330
                return err
×
4331
        }
4332

4333
        // As we've just accepted a new state, we'll now immediately send the
4334
        // remote peer a revocation for our prior state.
4335
        nextRevocation, currentHtlcs, finalHTLCs, err :=
1,169✔
4336
                l.channel.RevokeCurrentCommitment()
1,169✔
4337
        if err != nil {
1,169✔
4338
                l.log.Errorf("unable to revoke commitment: %v", err)
×
4339

×
4340
                // We need to fail the channel in case revoking our local
×
4341
                // commitment does not succeed. We might have already advanced
×
4342
                // our channel state which would lead us to proceed with an
×
4343
                // unclean state.
×
4344
                //
×
4345
                // NOTE: We do not trigger a force close because this could
×
4346
                // resolve itself in case our db was just busy not accepting new
×
4347
                // transactions.
×
4348
                l.failf(
×
4349
                        LinkFailureError{
×
4350
                                code:          ErrInternalError,
×
4351
                                Warning:       true,
×
4352
                                FailureAction: LinkFailureDisconnect,
×
4353
                        },
×
4354
                        "ChannelPoint(%v): unable to accept new "+
×
4355
                                "commitment: %v",
×
4356
                        l.channel.ChannelPoint(), err,
×
4357
                )
×
4358

×
4359
                return err
×
4360
        }
×
4361

4362
        // As soon as we are ready to send our next revocation, we can invoke
4363
        // the incoming commit hooks.
4364
        l.Lock()
1,169✔
4365
        l.incomingCommitHooks.invoke()
1,169✔
4366
        l.Unlock()
1,169✔
4367

1,169✔
4368
        err = l.cfg.Peer.SendMessage(false, nextRevocation)
1,169✔
4369
        if err != nil {
1,169✔
4370
                l.log.Errorf("failed to send RevokeAndAck: %v", err)
×
4371
        }
×
4372

4373
        // Notify the incoming htlcs of which the resolutions were locked in.
4374
        for id, settled := range finalHTLCs {
1,502✔
4375
                l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
333✔
4376
                        models.CircuitKey{
333✔
4377
                                ChanID: l.ShortChanID(),
333✔
4378
                                HtlcID: id,
333✔
4379
                        },
333✔
4380
                        channeldb.FinalHtlcInfo{
333✔
4381
                                Settled:  settled,
333✔
4382
                                Offchain: true,
333✔
4383
                        },
333✔
4384
                )
333✔
4385
        }
333✔
4386

4387
        // Since we just revoked our commitment, we may have a new set of HTLC's
4388
        // on our commitment, so we'll send them using our function closure
4389
        // NotifyContractUpdate.
4390
        newUpdate := &contractcourt.ContractUpdate{
1,169✔
4391
                HtlcKey: contractcourt.LocalHtlcSet,
1,169✔
4392
                Htlcs:   currentHtlcs,
1,169✔
4393
        }
1,169✔
4394
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,169✔
4395
        if err != nil {
1,169✔
4396
                return fmt.Errorf("unable to notify contract update: %w", err)
×
4397
        }
×
4398

4399
        select {
1,169✔
4400
        case <-l.cg.Done():
×
4401
                return nil
×
4402
        default:
1,169✔
4403
        }
4404

4405
        // If the remote party initiated the state transition, we'll reply with
4406
        // a signature to provide them with their version of the latest
4407
        // commitment. Otherwise, both commitment chains are fully synced from
4408
        // our PoV, then we don't need to reply with a signature as both sides
4409
        // already have a commitment with the latest accepted.
4410
        if l.channel.OweCommitment() {
1,819✔
4411
                if !l.updateCommitTxOrFail(ctx) {
650✔
4412
                        return nil
×
4413
                }
×
4414
        }
4415

4416
        // If we need to send out an Stfu, this would be the time to do so.
4417
        if l.noDanglingUpdates(lntypes.Local) {
2,216✔
4418
                err = l.quiescer.SendOwedStfu()
1,047✔
4419
                if err != nil {
1,047✔
4420
                        l.stfuFailf("sendOwedStfu: %v", err.Error())
×
4421
                }
×
4422
        }
4423

4424
        // Now that we have finished processing the incoming CommitSig and sent
4425
        // out our RevokeAndAck, we invoke the flushHooks if the channel state
4426
        // is clean.
4427
        l.Lock()
1,169✔
4428
        if l.channel.IsChannelClean() {
1,348✔
4429
                l.flushHooks.invoke()
179✔
4430
        }
179✔
4431
        l.Unlock()
1,169✔
4432

1,169✔
4433
        return nil
1,169✔
4434
}
4435

4436
// processRemoteRevokeAndAck takes a `RevokeAndAck` msg sent from the remote and
4437
// processes it.
4438
func (l *channelLink) processRemoteRevokeAndAck(ctx context.Context,
4439
        msg *lnwire.RevokeAndAck) error {
1,158✔
4440

1,158✔
4441
        // We've received a revocation from the remote chain, if valid, this
1,158✔
4442
        // moves the remote chain forward, and expands our revocation window.
1,158✔
4443

1,158✔
4444
        // We now process the message and advance our remote commit chain.
1,158✔
4445
        fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
1,158✔
4446
        if err != nil {
1,158✔
4447
                // TODO(halseth): force close?
×
4448
                l.failf(
×
4449
                        LinkFailureError{
×
4450
                                code:          ErrInvalidRevocation,
×
4451
                                FailureAction: LinkFailureDisconnect,
×
4452
                        },
×
4453
                        "unable to accept revocation: %v", err,
×
4454
                )
×
4455

×
4456
                return err
×
4457
        }
×
4458

4459
        // The remote party now has a new primary commitment, so we'll update
4460
        // the contract court to be aware of this new set (the prior old remote
4461
        // pending).
4462
        newUpdate := &contractcourt.ContractUpdate{
1,158✔
4463
                HtlcKey: contractcourt.RemoteHtlcSet,
1,158✔
4464
                Htlcs:   remoteHTLCs,
1,158✔
4465
        }
1,158✔
4466
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,158✔
4467
        if err != nil {
1,158✔
4468
                return fmt.Errorf("unable to notify contract update: %w", err)
×
4469
        }
×
4470

4471
        select {
1,158✔
4472
        case <-l.cg.Done():
1✔
4473
                return nil
1✔
4474
        default:
1,157✔
4475
        }
4476

4477
        // If we have a tower client for this channel type, we'll create a
4478
        // backup for the current state.
4479
        if l.cfg.TowerClient != nil {
1,160✔
4480
                state := l.channel.State()
3✔
4481
                chanID := l.ChanID()
3✔
4482

3✔
4483
                err = l.cfg.TowerClient.BackupState(
3✔
4484
                        &chanID, state.RemoteCommitment.CommitHeight-1,
3✔
4485
                )
3✔
4486
                if err != nil {
3✔
4487
                        l.failf(LinkFailureError{
×
4488
                                code: ErrInternalError,
×
4489
                        }, "unable to queue breach backup: %v", err)
×
4490

×
4491
                        return err
×
4492
                }
×
4493
        }
4494

4495
        // If we can send updates then we can process adds in case we are the
4496
        // exit hop and need to send back resolutions, or in case there are
4497
        // validity issues with the packets. Otherwise we defer the action until
4498
        // resume.
4499
        //
4500
        // We are free to process the settles and fails without this check since
4501
        // processing those can't result in further updates to this channel
4502
        // link.
4503
        if l.quiescer.CanSendUpdates() {
2,313✔
4504
                l.processRemoteAdds(fwdPkg)
1,156✔
4505
        } else {
1,157✔
4506
                l.quiescer.OnResume(func() {
1✔
4507
                        l.processRemoteAdds(fwdPkg)
×
4508
                })
×
4509
        }
4510
        l.processRemoteSettleFails(fwdPkg)
1,157✔
4511

1,157✔
4512
        // If the link failed during processing the adds, we must return to
1,157✔
4513
        // ensure we won't attempted to update the state further.
1,157✔
4514
        if l.failed {
1,157✔
4515
                return nil
×
4516
        }
×
4517

4518
        // The revocation window opened up. If there are pending local updates,
4519
        // try to update the commit tx. Pending updates could already have been
4520
        // present because of a previously failed update to the commit tx or
4521
        // freshly added in by processRemoteAdds. Also in case there are no
4522
        // local updates, but there are still remote updates that are not in the
4523
        // remote commit tx yet, send out an update.
4524
        if l.channel.OweCommitment() {
1,452✔
4525
                if !l.updateCommitTxOrFail(ctx) {
302✔
4526
                        return nil
7✔
4527
                }
7✔
4528
        }
4529

4530
        // Now that we have finished processing the RevokeAndAck, we can invoke
4531
        // the flushHooks if the channel state is clean.
4532
        l.Lock()
1,150✔
4533
        if l.channel.IsChannelClean() {
1,312✔
4534
                l.flushHooks.invoke()
162✔
4535
        }
162✔
4536
        l.Unlock()
1,150✔
4537

1,150✔
4538
        return nil
1,150✔
4539
}
4540

4541
// processRemoteUpdateFee takes an `UpdateFee` msg sent from the remote and
4542
// processes it.
4543
func (l *channelLink) processRemoteUpdateFee(msg *lnwire.UpdateFee) error {
3✔
4544
        // Check and see if their proposed fee-rate would make us exceed the fee
3✔
4545
        // threshold.
3✔
4546
        fee := chainfee.SatPerKWeight(msg.FeePerKw)
3✔
4547

3✔
4548
        isDust, err := l.exceedsFeeExposureLimit(fee)
3✔
4549
        if err != nil {
3✔
4550
                // This shouldn't typically happen. If it does, it indicates
×
4551
                // something is wrong with our channel state.
×
4552
                l.log.Errorf("Unable to determine if fee threshold " +
×
4553
                        "exceeded")
×
4554
                l.failf(LinkFailureError{code: ErrInternalError},
×
4555
                        "error calculating fee exposure: %v", err)
×
4556

×
4557
                return err
×
4558
        }
×
4559

4560
        if isDust {
3✔
4561
                // The proposed fee-rate makes us exceed the fee threshold.
×
4562
                l.failf(LinkFailureError{code: ErrInternalError},
×
4563
                        "fee threshold exceeded: %v", err)
×
4564
                return err
×
4565
        }
×
4566

4567
        // We received fee update from peer. If we are the initiator we will
4568
        // fail the channel, if not we will apply the update.
4569
        if err := l.channel.ReceiveUpdateFee(fee); err != nil {
3✔
4570
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4571
                        "error receiving fee update: %v", err)
×
4572
                return err
×
4573
        }
×
4574

4575
        // Update the mailbox's feerate as well.
4576
        l.mailBox.SetFeeRate(fee)
3✔
4577

3✔
4578
        return nil
3✔
4579
}
4580

4581
// processRemoteError takes an `Error` msg sent from the remote and fails the
4582
// channel link.
4583
func (l *channelLink) processRemoteError(msg *lnwire.Error) {
2✔
4584
        // Error received from remote, MUST fail channel, but should only print
2✔
4585
        // the contents of the error message if all characters are printable
2✔
4586
        // ASCII.
2✔
4587
        l.failf(
2✔
4588
                // TODO(halseth): we currently don't fail the channel
2✔
4589
                // permanently, as there are some sync issues with other
2✔
4590
                // implementations that will lead to them sending an
2✔
4591
                // error message, but we can recover from on next
2✔
4592
                // connection. See
2✔
4593
                // https://github.com/ElementsProject/lightning/issues/4212
2✔
4594
                LinkFailureError{
2✔
4595
                        code:             ErrRemoteError,
2✔
4596
                        PermanentFailure: false,
2✔
4597
                },
2✔
4598
                "ChannelPoint(%v): received error from peer: %v",
2✔
4599
                l.channel.ChannelPoint(), msg.Error(),
2✔
4600
        )
2✔
4601
}
2✔
4602

4603
// processLocalUpdateFulfillHTLC takes an `UpdateFulfillHTLC` from the local and
4604
// processes it.
4605
func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context,
4606
        pkt *htlcPacket, htlc *lnwire.UpdateFulfillHTLC) {
26✔
4607

26✔
4608
        // If hodl.SettleOutgoing mode is active, we exit early to simulate
26✔
4609
        // arbitrary delays between the switch adding the SETTLE to the mailbox,
26✔
4610
        // and the HTLC being added to the commitment state.
26✔
4611
        if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
26✔
4612
                l.log.Warnf(hodl.SettleOutgoing.Warning())
×
4613
                l.mailBox.AckPacket(pkt.inKey())
×
4614

×
4615
                return
×
4616
        }
×
4617

4618
        // An HTLC we forward to the switch has just settled somewhere upstream.
4619
        // Therefore we settle the HTLC within the our local state machine.
4620
        inKey := pkt.inKey()
26✔
4621
        err := l.channel.SettleHTLC(
26✔
4622
                htlc.PaymentPreimage, pkt.incomingHTLCID, pkt.sourceRef,
26✔
4623
                pkt.destRef, &inKey,
26✔
4624
        )
26✔
4625
        if err != nil {
26✔
4626
                l.log.Errorf("unable to settle incoming HTLC for "+
×
4627
                        "circuit-key=%v: %v", inKey, err)
×
4628

×
4629
                // If the HTLC index for Settle response was not known to our
×
4630
                // commitment state, it has already been cleaned up by a prior
×
4631
                // response. We'll thus try to clean up any lingering state to
×
4632
                // ensure we don't continue reforwarding.
×
4633
                if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
×
4634
                        l.cleanupSpuriousResponse(pkt)
×
4635
                }
×
4636

4637
                // Remove the packet from the link's mailbox to ensure it
4638
                // doesn't get replayed after a reconnection.
4639
                l.mailBox.AckPacket(inKey)
×
4640

×
4641
                return
×
4642
        }
4643

4644
        l.log.Debugf("queueing removal of SETTLE closed circuit: %s->%s",
26✔
4645
                pkt.inKey(), pkt.outKey())
26✔
4646

26✔
4647
        l.closedCircuits = append(l.closedCircuits, pkt.inKey())
26✔
4648

26✔
4649
        // With the HTLC settled, we'll need to populate the wire message to
26✔
4650
        // target the specific channel and HTLC to be canceled.
26✔
4651
        htlc.ChanID = l.ChanID()
26✔
4652
        htlc.ID = pkt.incomingHTLCID
26✔
4653

26✔
4654
        // Then we send the HTLC settle message to the connected peer so we can
26✔
4655
        // continue the propagation of the settle message.
26✔
4656
        err = l.cfg.Peer.SendMessage(false, htlc)
26✔
4657
        if err != nil {
26✔
4658
                l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
×
4659
        }
×
4660

4661
        // Send a settle event notification to htlcNotifier.
4662
        l.cfg.HtlcNotifier.NotifySettleEvent(
26✔
4663
                newHtlcKey(pkt), htlc.PaymentPreimage, getEventType(pkt),
26✔
4664
        )
26✔
4665

26✔
4666
        // Immediately update the commitment tx to minimize latency.
26✔
4667
        l.updateCommitTxOrFail(ctx)
26✔
4668
}
4669

4670
// processLocalUpdateFailHTLC takes an `UpdateFailHTLC` from the local and
4671
// processes it.
4672
func (l *channelLink) processLocalUpdateFailHTLC(ctx context.Context,
4673
        pkt *htlcPacket, htlc *lnwire.UpdateFailHTLC) {
20✔
4674

20✔
4675
        // If hodl.FailOutgoing mode is active, we exit early to simulate
20✔
4676
        // arbitrary delays between the switch adding a FAIL to the mailbox, and
20✔
4677
        // the HTLC being added to the commitment state.
20✔
4678
        if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
20✔
4679
                l.log.Warnf(hodl.FailOutgoing.Warning())
×
4680
                l.mailBox.AckPacket(pkt.inKey())
×
4681

×
4682
                return
×
4683
        }
×
4684

4685
        // An HTLC cancellation has been triggered somewhere upstream, we'll
4686
        // remove then HTLC from our local state machine.
4687
        inKey := pkt.inKey()
20✔
4688
        err := l.channel.FailHTLC(
20✔
4689
                pkt.incomingHTLCID, htlc.Reason, htlc.ExtraData, pkt.sourceRef,
20✔
4690
                pkt.destRef, &inKey,
20✔
4691
        )
20✔
4692
        if err != nil {
25✔
4693
                l.log.Errorf("unable to cancel incoming HTLC for "+
5✔
4694
                        "circuit-key=%v: %v", inKey, err)
5✔
4695

5✔
4696
                // If the HTLC index for Fail response was not known to our
5✔
4697
                // commitment state, it has already been cleaned up by a prior
5✔
4698
                // response. We'll thus try to clean up any lingering state to
5✔
4699
                // ensure we don't continue reforwarding.
5✔
4700
                if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
7✔
4701
                        l.cleanupSpuriousResponse(pkt)
2✔
4702
                }
2✔
4703

4704
                // Remove the packet from the link's mailbox to ensure it
4705
                // doesn't get replayed after a reconnection.
4706
                l.mailBox.AckPacket(inKey)
5✔
4707

5✔
4708
                return
5✔
4709
        }
4710

4711
        l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
18✔
4712
                pkt.inKey(), pkt.outKey())
18✔
4713

18✔
4714
        l.closedCircuits = append(l.closedCircuits, pkt.inKey())
18✔
4715

18✔
4716
        // With the HTLC removed, we'll need to populate the wire message to
18✔
4717
        // target the specific channel and HTLC to be canceled. The "Reason"
18✔
4718
        // field will have already been set within the switch.
18✔
4719
        htlc.ChanID = l.ChanID()
18✔
4720
        htlc.ID = pkt.incomingHTLCID
18✔
4721

18✔
4722
        // We send the HTLC message to the peer which initially created the
18✔
4723
        // HTLC. If the incoming blinding point is non-nil, we know that we are
18✔
4724
        // a relaying node in a blinded path. Otherwise, we're either an
18✔
4725
        // introduction node or not part of a blinded path at all.
18✔
4726
        err = l.sendIncomingHTLCFailureMsg(
18✔
4727
                htlc.ID, pkt.obfuscator, htlc.Reason, htlc.ExtraData,
18✔
4728
        )
18✔
4729
        if err != nil {
18✔
4730
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
4731

×
4732
                return
×
4733
        }
×
4734

4735
        // If the packet does not have a link failure set, it failed further
4736
        // down the route so we notify a forwarding failure. Otherwise, we
4737
        // notify a link failure because it failed at our node.
4738
        if pkt.linkFailure != nil {
30✔
4739
                l.cfg.HtlcNotifier.NotifyLinkFailEvent(
12✔
4740
                        newHtlcKey(pkt), newHtlcInfo(pkt), getEventType(pkt),
12✔
4741
                        pkt.linkFailure, false,
12✔
4742
                )
12✔
4743
        } else {
21✔
4744
                l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
9✔
4745
                        newHtlcKey(pkt), getEventType(pkt),
9✔
4746
                )
9✔
4747
        }
9✔
4748

4749
        // Immediately update the commitment tx to minimize latency.
4750
        l.updateCommitTxOrFail(ctx)
18✔
4751
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc