• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17917482292

22 Sep 2025 01:50PM UTC coverage: 56.562% (-10.1%) from 66.668%
17917482292

Pull #10182

github

web-flow
Merge 9efe3bd8c into 055fb436e
Pull Request #10182: Aux feature bits

32 of 68 new or added lines in 5 files covered. (47.06%)

29734 existing lines in 467 files now uncovered.

98449 of 174056 relevant lines covered (56.56%)

1.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.33
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        crand "crypto/rand"
7
        "crypto/sha256"
8
        "errors"
9
        "fmt"
10
        prand "math/rand"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/btcsuite/btclog/v2"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/contractcourt"
20
        "github.com/lightningnetwork/lnd/fn/v2"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/invoices"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lntypes"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
31
        "github.com/lightningnetwork/lnd/lnwire"
32
        "github.com/lightningnetwork/lnd/queue"
33
        "github.com/lightningnetwork/lnd/record"
34
        "github.com/lightningnetwork/lnd/routing/route"
35
        "github.com/lightningnetwork/lnd/ticker"
36
        "github.com/lightningnetwork/lnd/tlv"
37
)
38

39
const (
40
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
41
        // the node accepts for forwarded payments. The value is relative to the
42
        // current block height. The reason to have a maximum is to prevent
43
        // funds getting locked up unreasonably long. Otherwise, an attacker
44
        // willing to lock its own funds too, could force the funds of this node
45
        // to be locked up for an indefinite (max int32) number of blocks.
46
        //
47
        // The value 2016 corresponds to on average two weeks worth of blocks
48
        // and is based on the maximum number of hops (20), the default CLTV
49
        // delta (40), and some extra margin to account for the other lightning
50
        // implementations and past lnd versions which used to have a default
51
        // CLTV delta of 144.
52
        DefaultMaxOutgoingCltvExpiry = 2016
53

54
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
55
        // which a link should propose to update its commitment fee rate.
56
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
57

58
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
59
        // which a link should propose to update its commitment fee rate.
60
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
61

62
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
63
        // a channel's commitment fee to be of its balance. This only applies to
64
        // the initiator of the channel.
65
        DefaultMaxLinkFeeAllocation float64 = 0.5
66
)
67

68
// ExpectedFee computes the expected fee for a given htlc amount. The value
69
// returned from this function is to be used as a sanity check when forwarding
70
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
71
// forwarding policy.
72
//
73
// TODO(roasbeef): also add in current available channel bandwidth, inverse
74
// func
75
func ExpectedFee(f models.ForwardingPolicy,
76
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
2✔
77

2✔
78
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
2✔
79
}
2✔
80

81
// ChannelLinkConfig defines the configuration for the channel link. ALL
82
// elements within the configuration MUST be non-nil for channel link to carry
83
// out its duties.
84
type ChannelLinkConfig struct {
85
        // FwrdingPolicy is the initial forwarding policy to be used when
86
        // deciding whether to forwarding incoming HTLC's or not. This value
87
        // can be updated with subsequent calls to UpdateForwardingPolicy
88
        // targeted at a given ChannelLink concrete interface implementation.
89
        FwrdingPolicy models.ForwardingPolicy
90

91
        // Circuits provides restricted access to the switch's circuit map,
92
        // allowing the link to open and close circuits.
93
        Circuits CircuitModifier
94

95
        // BestHeight returns the best known height.
96
        BestHeight func() uint32
97

98
        // ForwardPackets attempts to forward the batch of htlcs through the
99
        // switch. The function returns and error in case it fails to send one or
100
        // more packets. The link's quit signal should be provided to allow
101
        // cancellation of forwarding during link shutdown.
102
        ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
103

104
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
105
        // blobs, which are then used to inform how to forward an HTLC.
106
        //
107
        // NOTE: This function assumes the same set of readers and preimages
108
        // are always presented for the same identifier. The last boolean is
109
        // used to decide whether this is a reforwarding or not - when it's
110
        // reforwarding, we skip the replay check enforced in our decay log.
111
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) (
112
                []hop.DecodeHopIteratorResponse, error)
113

114
        // ExtractErrorEncrypter function is responsible for decoding HTLC
115
        // Sphinx onion blob, and creating onion failure obfuscator.
116
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
117

118
        // FetchLastChannelUpdate retrieves the latest routing policy for a
119
        // target channel. This channel will typically be the outgoing channel
120
        // specified when we receive an incoming HTLC.  This will be used to
121
        // provide payment senders our latest policy when sending encrypted
122
        // error messages.
123
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
124
                *lnwire.ChannelUpdate1, error)
125

126
        // Peer is a lightning network node with which we have the channel link
127
        // opened.
128
        Peer lnpeer.Peer
129

130
        // Registry is a sub-system which responsible for managing the invoices
131
        // in thread-safe manner.
132
        Registry InvoiceDatabase
133

134
        // PreimageCache is a global witness beacon that houses any new
135
        // preimages discovered by other links. We'll use this to add new
136
        // witnesses that we discover which will notify any sub-systems
137
        // subscribed to new events.
138
        PreimageCache contractcourt.WitnessBeacon
139

140
        // OnChannelFailure is a function closure that we'll call if the
141
        // channel failed for some reason. Depending on the severity of the
142
        // error, the closure potentially must force close this channel and
143
        // disconnect the peer.
144
        //
145
        // NOTE: The method must return in order for the ChannelLink to be able
146
        // to shut down properly.
147
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
148
                LinkFailureError)
149

150
        // UpdateContractSignals is a function closure that we'll use to update
151
        // outside sub-systems with this channel's latest ShortChannelID.
152
        UpdateContractSignals func(*contractcourt.ContractSignals) error
153

154
        // NotifyContractUpdate is a function closure that we'll use to update
155
        // the contractcourt and more specifically the ChannelArbitrator of the
156
        // latest channel state.
157
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
158

159
        // ChainEvents is an active subscription to the chain watcher for this
160
        // channel to be notified of any on-chain activity related to this
161
        // channel.
162
        ChainEvents *contractcourt.ChainEventSubscription
163

164
        // FeeEstimator is an instance of a live fee estimator which will be
165
        // used to dynamically regulate the current fee of the commitment
166
        // transaction to ensure timely confirmation.
167
        FeeEstimator chainfee.Estimator
168

169
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
170
        // for HTLC forwarding internal to the switch.
171
        //
172
        // NOTE: This should only be used for testing.
173
        HodlMask hodl.Mask
174

175
        // SyncStates is used to indicate that we need send the channel
176
        // reestablishment message to the remote peer. It should be done if our
177
        // clients have been restarted, or remote peer have been reconnected.
178
        SyncStates bool
179

180
        // BatchTicker is the ticker that determines the interval that we'll
181
        // use to check the batch to see if there're any updates we should
182
        // flush out. By batching updates into a single commit, we attempt to
183
        // increase throughput by maximizing the number of updates coalesced
184
        // into a single commit.
185
        BatchTicker ticker.Ticker
186

187
        // FwdPkgGCTicker is the ticker determining the frequency at which
188
        // garbage collection of forwarding packages occurs. We use a
189
        // time-based approach, as opposed to block epochs, as to not hinder
190
        // syncing.
191
        FwdPkgGCTicker ticker.Ticker
192

193
        // PendingCommitTicker is a ticker that allows the link to determine if
194
        // a locally initiated commitment dance gets stuck waiting for the
195
        // remote party to revoke.
196
        PendingCommitTicker ticker.Ticker
197

198
        // BatchSize is the max size of a batch of updates done to the link
199
        // before we do a state update.
200
        BatchSize uint32
201

202
        // UnsafeReplay will cause a link to replay the adds in its latest
203
        // commitment txn after the link is restarted. This should only be used
204
        // in testing, it is here to ensure the sphinx replay detection on the
205
        // receiving node is persistent.
206
        UnsafeReplay bool
207

208
        // MinUpdateTimeout represents the minimum interval in which a link
209
        // will propose to update its commitment fee rate. A random timeout will
210
        // be selected between this and MaxUpdateTimeout.
211
        MinUpdateTimeout time.Duration
212

213
        // MaxUpdateTimeout represents the maximum interval in which a link
214
        // will propose to update its commitment fee rate. A random timeout will
215
        // be selected between this and MinUpdateTimeout.
216
        MaxUpdateTimeout time.Duration
217

218
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
219
        // an htlc where we don't offer an htlc anymore. This should be at least
220
        // the outgoing broadcast delta, because in any case we don't want to
221
        // risk offering an htlc that triggers channel closure.
222
        OutgoingCltvRejectDelta uint32
223

224
        // TowerClient is an optional engine that manages the signing,
225
        // encrypting, and uploading of justice transactions to the daemon's
226
        // configured set of watchtowers for legacy channels.
227
        TowerClient TowerClient
228

229
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
230
        // should accept for a forwarded HTLC. The value is relative to the
231
        // current block height.
232
        MaxOutgoingCltvExpiry uint32
233

234
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
235
        // commitment fee to be of its balance. This only applies to the
236
        // initiator of the channel.
237
        MaxFeeAllocation float64
238

239
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
240
        // the initiator for channels of the anchor type.
241
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
242

243
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
244
        // link is first started.
245
        NotifyActiveLink func(wire.OutPoint)
246

247
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
248
        // channels becomes active.
249
        NotifyActiveChannel func(wire.OutPoint)
250

251
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
252
        // when channels become inactive.
253
        NotifyInactiveChannel func(wire.OutPoint)
254

255
        // NotifyInactiveLinkEvent allows the switch to tell the
256
        // ChannelNotifier when a channel link become inactive.
257
        NotifyInactiveLinkEvent func(wire.OutPoint)
258

259
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
260
        // events through.
261
        HtlcNotifier htlcNotifier
262

263
        // FailAliasUpdate is a function used to fail an HTLC for an
264
        // option_scid_alias channel.
265
        FailAliasUpdate func(sid lnwire.ShortChannelID,
266
                incoming bool) *lnwire.ChannelUpdate1
267

268
        // GetAliases is used by the link and switch to fetch the set of
269
        // aliases for a given link.
270
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
271

272
        // PreviouslySentShutdown is an optional value that is set if, at the
273
        // time of the link being started, persisted shutdown info was found for
274
        // the channel. This value being set means that we previously sent a
275
        // Shutdown message to our peer, and so we should do so again on
276
        // re-establish and should not allow anymore HTLC adds on the outgoing
277
        // direction of the link.
278
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
279

280
        // Adds the option to disable forwarding payments in blinded routes
281
        // by failing back any blinding-related payloads as if they were
282
        // invalid.
283
        DisallowRouteBlinding bool
284

285
        // DisallowQuiescence is a flag that can be used to disable the
286
        // quiescence protocol.
287
        DisallowQuiescence bool
288

289
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
290
        // restrict the flow of HTLCs and fee updates.
291
        MaxFeeExposure lnwire.MilliSatoshi
292

293
        // ShouldFwdExpEndorsement is a closure that indicates whether the link
294
        // should forward experimental endorsement signals.
295
        ShouldFwdExpEndorsement func() bool
296

297
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
298
        // used to manage the bandwidth of the link.
299
        AuxTrafficShaper fn.Option[AuxTrafficShaper]
300

301
        // AuxChannelNegotiator is an optional interface that allows aux channel
302
        // implementations to inject and process custom records over channel
303
        // related wire messages.
304
        AuxChannelNegotiator fn.Option[lnwallet.AuxChannelNegotiator]
305

306
        // QuiescenceTimeout is the max duration that the channel can be
307
        // quiesced. Any dependent protocols (dynamic commitments, splicing,
308
        // etc.) must finish their operations under this timeout value,
309
        // otherwise the node will disconnect.
310
        QuiescenceTimeout time.Duration
311
}
312

313
// channelLink is the service which drives a channel's commitment update
314
// state-machine. In the event that an HTLC needs to be propagated to another
315
// link, the forward handler from config is used which sends HTLC to the
316
// switch. Additionally, the link encapsulate logic of commitment protocol
317
// message ordering and updates.
318
type channelLink struct {
319
        // The following fields are only meant to be used *atomically*
320
        started       int32
321
        reestablished int32
322
        shutdown      int32
323

324
        // failed should be set to true in case a link error happens, making
325
        // sure we don't process any more updates.
326
        failed bool
327

328
        // keystoneBatch represents a volatile list of keystones that must be
329
        // written before attempting to sign the next commitment txn. These
330
        // represent all the HTLC's forwarded to the link from the switch. Once
331
        // we lock them into our outgoing commitment, then the circuit has a
332
        // keystone, and is fully opened.
333
        keystoneBatch []Keystone
334

335
        // openedCircuits is the set of all payment circuits that will be open
336
        // once we make our next commitment. After making the commitment we'll
337
        // ACK all these from our mailbox to ensure that they don't get
338
        // re-delivered if we reconnect.
339
        openedCircuits []CircuitKey
340

341
        // closedCircuits is the set of all payment circuits that will be
342
        // closed once we make our next commitment. After taking the commitment
343
        // we'll ACK all these to ensure that they don't get re-delivered if we
344
        // reconnect.
345
        closedCircuits []CircuitKey
346

347
        // channel is a lightning network channel to which we apply htlc
348
        // updates.
349
        channel *lnwallet.LightningChannel
350

351
        // cfg is a structure which carries all dependable fields/handlers
352
        // which may affect behaviour of the service.
353
        cfg ChannelLinkConfig
354

355
        // mailBox is the main interface between the outside world and the
356
        // link. All incoming messages will be sent over this mailBox. Messages
357
        // include new updates from our connected peer, and new packets to be
358
        // forwarded sent by the switch.
359
        mailBox MailBox
360

361
        // upstream is a channel that new messages sent from the remote peer to
362
        // the local peer will be sent across.
363
        upstream chan lnwire.Message
364

365
        // downstream is a channel in which new multi-hop HTLC's to be
366
        // forwarded will be sent across. Messages from this channel are sent
367
        // by the HTLC switch.
368
        downstream chan *htlcPacket
369

370
        // updateFeeTimer is the timer responsible for updating the link's
371
        // commitment fee every time it fires.
372
        updateFeeTimer *time.Timer
373

374
        // uncommittedPreimages stores a list of all preimages that have been
375
        // learned since receiving the last CommitSig from the remote peer. The
376
        // batch will be flushed just before accepting the subsequent CommitSig
377
        // or on shutdown to avoid doing a write for each preimage received.
378
        uncommittedPreimages []lntypes.Preimage
379

380
        sync.RWMutex
381

382
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
383
        // registry.
384
        hodlQueue *queue.ConcurrentQueue
385

386
        // hodlMap stores related htlc data for a circuit key. It allows
387
        // resolving those htlcs when we receive a message on hodlQueue.
388
        hodlMap map[models.CircuitKey]hodlHtlc
389

390
        // log is a link-specific logging instance.
391
        log btclog.Logger
392

393
        // isOutgoingAddBlocked tracks whether the channelLink can send an
394
        // UpdateAddHTLC.
395
        isOutgoingAddBlocked atomic.Bool
396

397
        // isIncomingAddBlocked tracks whether the channelLink can receive an
398
        // UpdateAddHTLC.
399
        isIncomingAddBlocked atomic.Bool
400

401
        // flushHooks is a hookMap that is triggered when we reach a channel
402
        // state with no live HTLCs.
403
        flushHooks hookMap
404

405
        // outgoingCommitHooks is a hookMap that is triggered after we send our
406
        // next CommitSig.
407
        outgoingCommitHooks hookMap
408

409
        // incomingCommitHooks is a hookMap that is triggered after we receive
410
        // our next CommitSig.
411
        incomingCommitHooks hookMap
412

413
        // quiescer is the state machine that tracks where this channel is with
414
        // respect to the quiescence protocol.
415
        quiescer Quiescer
416

417
        // quiescenceReqs is a queue of requests to quiesce this link. The
418
        // members of the queue are send-only channels we should call back with
419
        // the result.
420
        quiescenceReqs chan StfuReq
421

422
        // cg is a helper that encapsulates a wait group and quit channel and
423
        // allows contexts that either block or cancel on those depending on
424
        // the use case.
425
        cg *fn.ContextGuard
426
}
427

428
// hookMap is a data structure that is used to track the hooks that need to be
429
// called in various parts of the channelLink's lifecycle.
430
//
431
// WARNING: NOT thread-safe.
432
type hookMap struct {
433
        // allocIdx keeps track of the next id we haven't yet allocated.
434
        allocIdx atomic.Uint64
435

436
        // transient is a map of hooks that are only called the next time invoke
437
        // is called. These hooks are deleted during invoke.
438
        transient map[uint64]func()
439

440
        // newTransients is a channel that we use to accept new hooks into the
441
        // hookMap.
442
        newTransients chan func()
443
}
444

445
// newHookMap initializes a new empty hookMap.
446
func newHookMap() hookMap {
2✔
447
        return hookMap{
2✔
448
                allocIdx:      atomic.Uint64{},
2✔
449
                transient:     make(map[uint64]func()),
2✔
450
                newTransients: make(chan func()),
2✔
451
        }
2✔
452
}
2✔
453

454
// alloc allocates space in the hook map for the supplied hook, the second
455
// argument determines whether it goes into the transient or persistent part
456
// of the hookMap.
457
func (m *hookMap) alloc(hook func()) uint64 {
2✔
458
        // We assume we never overflow a uint64. Seems OK.
2✔
459
        hookID := m.allocIdx.Add(1)
2✔
460
        if hookID == 0 {
2✔
461
                panic("hookMap allocIdx overflow")
×
462
        }
463
        m.transient[hookID] = hook
2✔
464

2✔
465
        return hookID
2✔
466
}
467

468
// invoke is used on a hook map to call all the registered hooks and then clear
469
// out the transient hooks so they are not called again.
470
func (m *hookMap) invoke() {
2✔
471
        for _, hook := range m.transient {
4✔
472
                hook()
2✔
473
        }
2✔
474

475
        m.transient = make(map[uint64]func())
2✔
476
}
477

478
// hodlHtlc contains htlc data that is required for resolution.
479
type hodlHtlc struct {
480
        add        lnwire.UpdateAddHTLC
481
        sourceRef  channeldb.AddRef
482
        obfuscator hop.ErrorEncrypter
483
}
484

485
// NewChannelLink creates a new instance of a ChannelLink given a configuration
486
// and active channel that will be used to verify/apply updates to.
487
func NewChannelLink(cfg ChannelLinkConfig,
488
        channel *lnwallet.LightningChannel) ChannelLink {
2✔
489

2✔
490
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
2✔
491

2✔
492
        // If the max fee exposure isn't set, use the default.
2✔
493
        if cfg.MaxFeeExposure == 0 {
2✔
UNCOV
494
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
×
UNCOV
495
        }
×
496

497
        var qsm Quiescer
2✔
498
        if !cfg.DisallowQuiescence {
4✔
499
                qsm = NewQuiescer(QuiescerCfg{
2✔
500
                        chanID: lnwire.NewChanIDFromOutPoint(
2✔
501
                                channel.ChannelPoint(),
2✔
502
                        ),
2✔
503
                        channelInitiator: channel.Initiator(),
2✔
504
                        sendMsg: func(s lnwire.Stfu) error {
4✔
505
                                return cfg.Peer.SendMessage(false, &s)
2✔
506
                        },
2✔
507
                        timeoutDuration: cfg.QuiescenceTimeout,
508
                        onTimeout: func() {
2✔
509
                                cfg.Peer.Disconnect(ErrQuiescenceTimeout)
2✔
510
                        },
2✔
511
                })
512
        } else {
×
513
                qsm = &quiescerNoop{}
×
514
        }
×
515

516
        quiescenceReqs := make(
2✔
517
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
2✔
518
        )
2✔
519

2✔
520
        return &channelLink{
2✔
521
                cfg:                 cfg,
2✔
522
                channel:             channel,
2✔
523
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
2✔
524
                hodlQueue:           queue.NewConcurrentQueue(10),
2✔
525
                log:                 log.WithPrefix(logPrefix),
2✔
526
                flushHooks:          newHookMap(),
2✔
527
                outgoingCommitHooks: newHookMap(),
2✔
528
                incomingCommitHooks: newHookMap(),
2✔
529
                quiescer:            qsm,
2✔
530
                quiescenceReqs:      quiescenceReqs,
2✔
531
                cg:                  fn.NewContextGuard(),
2✔
532
        }
2✔
533
}
534

535
// A compile time check to ensure channelLink implements the ChannelLink
536
// interface.
537
var _ ChannelLink = (*channelLink)(nil)
538

539
// Start starts all helper goroutines required for the operation of the channel
540
// link.
541
//
542
// NOTE: Part of the ChannelLink interface.
543
func (l *channelLink) Start() error {
2✔
544
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
2✔
545
                err := fmt.Errorf("channel link(%v): already started", l)
×
546
                l.log.Warn("already started")
×
547
                return err
×
548
        }
×
549

550
        l.log.Info("starting")
2✔
551

2✔
552
        // If the config supplied watchtower client, ensure the channel is
2✔
553
        // registered before trying to use it during operation.
2✔
554
        if l.cfg.TowerClient != nil {
4✔
555
                err := l.cfg.TowerClient.RegisterChannel(
2✔
556
                        l.ChanID(), l.channel.State().ChanType,
2✔
557
                )
2✔
558
                if err != nil {
2✔
559
                        return err
×
560
                }
×
561
        }
562

563
        l.mailBox.ResetMessages()
2✔
564
        l.hodlQueue.Start()
2✔
565

2✔
566
        // Before launching the htlcManager messages, revert any circuits that
2✔
567
        // were marked open in the switch's circuit map, but did not make it
2✔
568
        // into a commitment txn. We use the next local htlc index as the cut
2✔
569
        // off point, since all indexes below that are committed. This action
2✔
570
        // is only performed if the link's final short channel ID has been
2✔
571
        // assigned, otherwise we would try to trim the htlcs belonging to the
2✔
572
        // all-zero, hop.Source ID.
2✔
573
        if l.ShortChanID() != hop.Source {
4✔
574
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
2✔
575
                if err != nil {
2✔
576
                        return fmt.Errorf("unable to retrieve next local "+
×
577
                                "htlc index: %v", err)
×
578
                }
×
579

580
                // NOTE: This is automatically done by the switch when it
581
                // starts up, but is necessary to prevent inconsistencies in
582
                // the case that the link flaps. This is a result of a link's
583
                // life-cycle being shorter than that of the switch.
584
                chanID := l.ShortChanID()
2✔
585
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
2✔
586
                if err != nil {
2✔
587
                        return fmt.Errorf("unable to trim circuits above "+
×
588
                                "local htlc index %d: %v", localHtlcIndex, err)
×
589
                }
×
590

591
                // Since the link is live, before we start the link we'll update
592
                // the ChainArbitrator with the set of new channel signals for
593
                // this channel.
594
                //
595
                // TODO(roasbeef): split goroutines within channel arb to avoid
596
                go func() {
4✔
597
                        signals := &contractcourt.ContractSignals{
2✔
598
                                ShortChanID: l.channel.ShortChanID(),
2✔
599
                        }
2✔
600

2✔
601
                        err := l.cfg.UpdateContractSignals(signals)
2✔
602
                        if err != nil {
2✔
603
                                l.log.Errorf("unable to update signals")
×
604
                        }
×
605
                }()
606
        }
607

608
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
2✔
609

2✔
610
        l.cg.WgAdd(1)
2✔
611
        go l.htlcManager(context.TODO())
2✔
612

2✔
613
        return nil
2✔
614
}
615

616
// Stop gracefully stops all active helper goroutines, then waits until they've
617
// exited.
618
//
619
// NOTE: Part of the ChannelLink interface.
620
func (l *channelLink) Stop() {
2✔
621
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
2✔
UNCOV
622
                l.log.Warn("already stopped")
×
UNCOV
623
                return
×
UNCOV
624
        }
×
625

626
        l.log.Info("stopping")
2✔
627

2✔
628
        // As the link is stopping, we are no longer interested in htlc
2✔
629
        // resolutions coming from the invoice registry.
2✔
630
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
2✔
631

2✔
632
        if l.cfg.ChainEvents.Cancel != nil {
4✔
633
                l.cfg.ChainEvents.Cancel()
2✔
634
        }
2✔
635

636
        // Ensure the channel for the timer is drained.
637
        if l.updateFeeTimer != nil {
4✔
638
                if !l.updateFeeTimer.Stop() {
2✔
639
                        select {
×
640
                        case <-l.updateFeeTimer.C:
×
641
                        default:
×
642
                        }
643
                }
644
        }
645

646
        if l.hodlQueue != nil {
4✔
647
                l.hodlQueue.Stop()
2✔
648
        }
2✔
649

650
        l.cg.Quit()
2✔
651
        l.cg.WgWait()
2✔
652

2✔
653
        // Now that the htlcManager has completely exited, reset the packet
2✔
654
        // courier. This allows the mailbox to revaluate any lingering Adds that
2✔
655
        // were delivered but didn't make it on a commitment to be failed back
2✔
656
        // if the link is offline for an extended period of time. The error is
2✔
657
        // ignored since it can only fail when the daemon is exiting.
2✔
658
        _ = l.mailBox.ResetPackets()
2✔
659

2✔
660
        // As a final precaution, we will attempt to flush any uncommitted
2✔
661
        // preimages to the preimage cache. The preimages should be re-delivered
2✔
662
        // after channel reestablishment, however this adds an extra layer of
2✔
663
        // protection in case the peer never returns. Without this, we will be
2✔
664
        // unable to settle any contracts depending on the preimages even though
2✔
665
        // we had learned them at some point.
2✔
666
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
2✔
667
        if err != nil {
2✔
668
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
669
                        l.uncommittedPreimages, err)
×
670
        }
×
671
}
672

673
// WaitForShutdown blocks until the link finishes shutting down, which includes
674
// termination of all dependent goroutines.
675
func (l *channelLink) WaitForShutdown() {
×
676
        l.cg.WgWait()
×
677
}
×
678

679
// EligibleToForward returns a bool indicating if the channel is able to
680
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
681
// we are eligible to update AND the channel isn't currently flushing the
682
// outgoing half of the channel.
683
//
684
// NOTE: MUST NOT be called from the main event loop.
685
func (l *channelLink) EligibleToForward() bool {
2✔
686
        l.RLock()
2✔
687
        defer l.RUnlock()
2✔
688

2✔
689
        return l.eligibleToForward()
2✔
690
}
2✔
691

692
// eligibleToForward returns a bool indicating if the channel is able to
693
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
694
// we are eligible to update AND the channel isn't currently flushing the
695
// outgoing half of the channel.
696
//
697
// NOTE: MUST be called from the main event loop.
698
func (l *channelLink) eligibleToForward() bool {
2✔
699
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
2✔
700
}
2✔
701

702
// eligibleToUpdate returns a bool indicating if the channel is able to update
703
// channel state. We're able to update channel state if we know the remote
704
// party's next revocation point. Otherwise, we can't initiate new channel
705
// state. We also require that the short channel ID not be the all-zero source
706
// ID, meaning that the channel has had its ID finalized.
707
//
708
// NOTE: MUST be called from the main event loop.
709
func (l *channelLink) eligibleToUpdate() bool {
2✔
710
        return l.channel.RemoteNextRevocation() != nil &&
2✔
711
                l.channel.ShortChanID() != hop.Source &&
2✔
712
                l.isReestablished() &&
2✔
713
                l.quiescer.CanSendUpdates()
2✔
714
}
2✔
715

716
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
717
// the specified direction. It returns true if the state was changed and false
718
// if the desired state was already set before the method was called.
UNCOV
719
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
×
UNCOV
720
        if linkDirection == Outgoing {
×
UNCOV
721
                return l.isOutgoingAddBlocked.Swap(false)
×
UNCOV
722
        }
×
723

UNCOV
724
        return l.isIncomingAddBlocked.Swap(false)
×
725
}
726

727
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
728
// the specified direction. It returns true if the state was changed and false
729
// if the desired state was already set before the method was called.
730
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
2✔
731
        if linkDirection == Outgoing {
4✔
732
                return !l.isOutgoingAddBlocked.Swap(true)
2✔
733
        }
2✔
734

735
        return !l.isIncomingAddBlocked.Swap(true)
2✔
736
}
737

738
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
739
// the argument.
740
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
2✔
741
        if linkDirection == Outgoing {
4✔
742
                return l.isOutgoingAddBlocked.Load()
2✔
743
        }
2✔
744

745
        return l.isIncomingAddBlocked.Load()
2✔
746
}
747

748
// OnFlushedOnce adds a hook that will be called the next time the channel
749
// state reaches zero htlcs. This hook will only ever be called once. If the
750
// channel state already has zero htlcs, then this will be called immediately.
751
func (l *channelLink) OnFlushedOnce(hook func()) {
2✔
752
        select {
2✔
753
        case l.flushHooks.newTransients <- hook:
2✔
754
        case <-l.cg.Done():
×
755
        }
756
}
757

758
// OnCommitOnce adds a hook that will be called the next time a CommitSig
759
// message is sent in the argument's LinkDirection. This hook will only ever be
760
// called once. If no CommitSig is owed in the argument's LinkDirection, then
761
// we will call this hook be run immediately.
762
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
2✔
763
        var queue chan func()
2✔
764

2✔
765
        if direction == Outgoing {
4✔
766
                queue = l.outgoingCommitHooks.newTransients
2✔
767
        } else {
2✔
768
                queue = l.incomingCommitHooks.newTransients
×
769
        }
×
770

771
        select {
2✔
772
        case queue <- hook:
2✔
773
        case <-l.cg.Done():
×
774
        }
775
}
776

777
// InitStfu allows us to initiate quiescence on this link. It returns a receive
778
// only channel that will block until quiescence has been achieved, or
779
// definitively fails.
780
//
781
// This operation has been added to allow channels to be quiesced via RPC. It
782
// may be removed or reworked in the future as RPC initiated quiescence is a
783
// holdover until we have downstream protocols that use it.
784
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
2✔
785
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
2✔
786
                fn.Unit{},
2✔
787
        )
2✔
788

2✔
789
        select {
2✔
790
        case l.quiescenceReqs <- req:
2✔
791
        case <-l.cg.Done():
×
792
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
793
        }
794

795
        return out
2✔
796
}
797

798
// isReestablished returns true if the link has successfully completed the
799
// channel reestablishment dance.
800
func (l *channelLink) isReestablished() bool {
2✔
801
        return atomic.LoadInt32(&l.reestablished) == 1
2✔
802
}
2✔
803

804
// markReestablished signals that the remote peer has successfully exchanged
805
// channel reestablish messages and that the channel is ready to process
806
// subsequent messages.
807
func (l *channelLink) markReestablished() {
2✔
808
        atomic.StoreInt32(&l.reestablished, 1)
2✔
809
}
2✔
810

811
// IsUnadvertised returns true if the underlying channel is unadvertised.
812
func (l *channelLink) IsUnadvertised() bool {
2✔
813
        state := l.channel.State()
2✔
814
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
2✔
815
}
2✔
816

817
// sampleNetworkFee samples the current fee rate on the network to get into the
818
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
819
// this is the native rate used when computing the fee for commitment
820
// transactions, and the second-level HTLC transactions.
UNCOV
821
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
×
UNCOV
822
        // We'll first query for the sat/kw recommended to be confirmed within 3
×
UNCOV
823
        // blocks.
×
UNCOV
824
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
×
UNCOV
825
        if err != nil {
×
826
                return 0, err
×
827
        }
×
828

UNCOV
829
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
×
UNCOV
830
                int64(feePerKw))
×
UNCOV
831

×
UNCOV
832
        return feePerKw, nil
×
833
}
834

835
// shouldAdjustCommitFee returns true if we should update our commitment fee to
836
// match that of the network fee. We'll only update our commitment fee if the
837
// network fee is +/- 10% to our commitment fee or if our current commitment
838
// fee is below the minimum relay fee.
839
func shouldAdjustCommitFee(netFee, chanFee,
UNCOV
840
        minRelayFee chainfee.SatPerKWeight) bool {
×
UNCOV
841

×
UNCOV
842
        switch {
×
843
        // If the network fee is greater than our current commitment fee and
844
        // our current commitment fee is below the minimum relay fee then
845
        // we should switch to it no matter if it is less than a 10% increase.
UNCOV
846
        case netFee > chanFee && chanFee < minRelayFee:
×
UNCOV
847
                return true
×
848

849
        // If the network fee is greater than the commitment fee, then we'll
850
        // switch to it if it's at least 10% greater than the commit fee.
UNCOV
851
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
×
UNCOV
852
                return true
×
853

854
        // If the network fee is less than our commitment fee, then we'll
855
        // switch to it if it's at least 10% less than the commitment fee.
UNCOV
856
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
×
UNCOV
857
                return true
×
858

859
        // Otherwise, we won't modify our fee.
UNCOV
860
        default:
×
UNCOV
861
                return false
×
862
        }
863
}
864

865
// failCb is used to cut down on the argument verbosity.
866
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
867

868
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
869
// outgoing HTLC. It may return a FailureMessage that references a channel's
870
// alias. If the channel does not have an alias, then the regular channel
871
// update from disk will be returned.
872
func (l *channelLink) createFailureWithUpdate(incoming bool,
873
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
2✔
874

2✔
875
        // Determine which SCID to use in case we need to use aliases in the
2✔
876
        // ChannelUpdate.
2✔
877
        scid := outgoingScid
2✔
878
        if incoming {
2✔
879
                scid = l.ShortChanID()
×
880
        }
×
881

882
        // Try using the FailAliasUpdate function. If it returns nil, fallback
883
        // to the non-alias behavior.
884
        update := l.cfg.FailAliasUpdate(scid, incoming)
2✔
885
        if update == nil {
4✔
886
                // Fallback to the non-alias behavior.
2✔
887
                var err error
2✔
888
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
2✔
889
                if err != nil {
2✔
890
                        return &lnwire.FailTemporaryNodeFailure{}
×
891
                }
×
892
        }
893

894
        return cb(update)
2✔
895
}
896

897
// syncChanState attempts to synchronize channel states with the remote party.
898
// This method is to be called upon reconnection after the initial funding
899
// flow. We'll compare out commitment chains with the remote party, and re-send
900
// either a danging commit signature, a revocation, or both.
901
func (l *channelLink) syncChanStates(ctx context.Context) error {
2✔
902
        chanState := l.channel.State()
2✔
903

2✔
904
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
2✔
905

2✔
906
        // First, we'll generate our ChanSync message to send to the other
2✔
907
        // side. Based on this message, the remote party will decide if they
2✔
908
        // need to retransmit any data or not.
2✔
909
        localChanSyncMsg, err := chanState.ChanSyncMsg()
2✔
910
        if err != nil {
2✔
911
                return fmt.Errorf("unable to generate chan sync message for "+
×
912
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
913
        }
×
914
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
2✔
915
                return fmt.Errorf("unable to send chan sync message for "+
×
916
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
917
        }
×
918

919
        var msgsToReSend []lnwire.Message
2✔
920

2✔
921
        // Next, we'll wait indefinitely to receive the ChanSync message. The
2✔
922
        // first message sent MUST be the ChanSync message.
2✔
923
        select {
2✔
924
        case msg := <-l.upstream:
2✔
925
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
2✔
926
                        l.cfg.Peer.PubKey())
2✔
927

2✔
928
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
2✔
929
                if !ok {
2✔
930
                        return fmt.Errorf("first message sent to sync "+
×
931
                                "should be ChannelReestablish, instead "+
×
932
                                "received: %T", msg)
×
933
                }
×
934

935
                // If the remote party indicates that they think we haven't
936
                // done any state updates yet, then we'll retransmit the
937
                // channel_ready message first. We do this, as at this point
938
                // we can't be sure if they've really received the
939
                // ChannelReady message.
940
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
2✔
941
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
2✔
942
                        !l.channel.IsPending() {
4✔
943

2✔
944
                        l.log.Infof("resending ChannelReady message to peer")
2✔
945

2✔
946
                        nextRevocation, err := l.channel.NextRevocationKey()
2✔
947
                        if err != nil {
2✔
948
                                return fmt.Errorf("unable to create next "+
×
949
                                        "revocation: %v", err)
×
950
                        }
×
951

952
                        channelReadyMsg := lnwire.NewChannelReady(
2✔
953
                                l.ChanID(), nextRevocation,
2✔
954
                        )
2✔
955

2✔
956
                        // If this is a taproot channel, then we'll send the
2✔
957
                        // very same nonce that we sent above, as they should
2✔
958
                        // take the latest verification nonce we send.
2✔
959
                        if chanState.ChanType.IsTaproot() {
4✔
960
                                //nolint:ll
2✔
961
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
2✔
962
                        }
2✔
963

964
                        // For channels that negotiated the option-scid-alias
965
                        // feature bit, ensure that we send over the alias in
966
                        // the channel_ready message. We'll send the first
967
                        // alias we find for the channel since it does not
968
                        // matter which alias we send. We'll error out if no
969
                        // aliases are found.
970
                        if l.negotiatedAliasFeature() {
4✔
971
                                aliases := l.getAliases()
2✔
972
                                if len(aliases) == 0 {
2✔
973
                                        // This shouldn't happen since we
×
974
                                        // always add at least one alias before
×
975
                                        // the channel reaches the link.
×
976
                                        return fmt.Errorf("no aliases found")
×
977
                                }
×
978

979
                                // getAliases returns a copy of the alias slice
980
                                // so it is ok to use a pointer to the first
981
                                // entry.
982
                                channelReadyMsg.AliasScid = &aliases[0]
2✔
983
                        }
984

985
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
2✔
986
                        if err != nil {
2✔
987
                                return fmt.Errorf("unable to re-send "+
×
988
                                        "ChannelReady: %v", err)
×
989
                        }
×
990
                }
991

992
                // In any case, we'll then process their ChanSync message.
993
                l.log.Info("received re-establishment message from remote side")
2✔
994

2✔
995
                // If we have an AuxChannelNegotiator we notify any external
2✔
996
                // component for this message. This serves as a notification
2✔
997
                // that the reestablish message was received.
2✔
998
                l.cfg.AuxChannelNegotiator.WhenSome(
2✔
999
                        func(acn lnwallet.AuxChannelNegotiator) {
2✔
NEW
1000
                                fundingPoint := l.channel.ChannelPoint()
×
NEW
1001
                                cid := lnwire.NewChanIDFromOutPoint(
×
NEW
1002
                                        fundingPoint,
×
NEW
1003
                                )
×
NEW
1004

×
NEW
1005
                                acn.ProcessReestablish(
×
NEW
1006
                                        cid, l.cfg.Peer.PubKey(),
×
NEW
1007
                                )
×
NEW
1008
                        },
×
1009
                )
1010

1011
                var (
2✔
1012
                        openedCircuits []CircuitKey
2✔
1013
                        closedCircuits []CircuitKey
2✔
1014
                )
2✔
1015

2✔
1016
                // We've just received a ChanSync message from the remote
2✔
1017
                // party, so we'll process the message  in order to determine
2✔
1018
                // if we need to re-transmit any messages to the remote party.
2✔
1019
                ctx, cancel := l.cg.Create(ctx)
2✔
1020
                defer cancel()
2✔
1021
                msgsToReSend, openedCircuits, closedCircuits, err =
2✔
1022
                        l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
2✔
1023
                if err != nil {
4✔
1024
                        return err
2✔
1025
                }
2✔
1026

1027
                // Repopulate any identifiers for circuits that may have been
1028
                // opened or unclosed. This may happen if we needed to
1029
                // retransmit a commitment signature message.
1030
                l.openedCircuits = openedCircuits
2✔
1031
                l.closedCircuits = closedCircuits
2✔
1032

2✔
1033
                // Ensure that all packets have been have been removed from the
2✔
1034
                // link's mailbox.
2✔
1035
                if err := l.ackDownStreamPackets(); err != nil {
2✔
1036
                        return err
×
1037
                }
×
1038

1039
                if len(msgsToReSend) > 0 {
2✔
UNCOV
1040
                        l.log.Infof("sending %v updates to synchronize the "+
×
UNCOV
1041
                                "state", len(msgsToReSend))
×
UNCOV
1042
                }
×
1043

1044
                // If we have any messages to retransmit, we'll do so
1045
                // immediately so we return to a synchronized state as soon as
1046
                // possible.
1047
                for _, msg := range msgsToReSend {
2✔
UNCOV
1048
                        err := l.cfg.Peer.SendMessage(false, msg)
×
UNCOV
1049
                        if err != nil {
×
1050
                                l.log.Errorf("failed to send %v: %v",
×
1051
                                        msg.MsgType(), err)
×
1052
                        }
×
1053
                }
1054

1055
        case <-l.cg.Done():
2✔
1056
                return ErrLinkShuttingDown
2✔
1057
        }
1058

1059
        return nil
2✔
1060
}
1061

1062
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1063
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1064
// we previously received are reinstated in memory, and forwarded to the switch
1065
// if necessary. After a restart, this will also delete any previously
1066
// completed packages.
1067
func (l *channelLink) resolveFwdPkgs(ctx context.Context) error {
2✔
1068
        fwdPkgs, err := l.channel.LoadFwdPkgs()
2✔
1069
        if err != nil {
2✔
1070
                return err
×
1071
        }
×
1072

1073
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
2✔
1074

2✔
1075
        for _, fwdPkg := range fwdPkgs {
4✔
1076
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
2✔
UNCOV
1077
                        return err
×
UNCOV
1078
                }
×
1079
        }
1080

1081
        // If any of our reprocessing steps require an update to the commitment
1082
        // txn, we initiate a state transition to capture all relevant changes.
1083
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
4✔
1084
                return l.updateCommitTx(ctx)
2✔
1085
        }
2✔
1086

1087
        return nil
2✔
1088
}
1089

1090
// resolveFwdPkg interprets the FwdState of the provided package, either
1091
// reprocesses any outstanding htlcs in the package, or performs garbage
1092
// collection on the package.
1093
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
2✔
1094
        // Remove any completed packages to clear up space.
2✔
1095
        if fwdPkg.State == channeldb.FwdStateCompleted {
4✔
1096
                l.log.Debugf("removing completed fwd pkg for height=%d",
2✔
1097
                        fwdPkg.Height)
2✔
1098

2✔
1099
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
2✔
1100
                if err != nil {
2✔
UNCOV
1101
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
UNCOV
1102
                                "%v", fwdPkg.Height, err)
×
UNCOV
1103
                        return err
×
UNCOV
1104
                }
×
1105
        }
1106

1107
        // Otherwise this is either a new package or one has gone through
1108
        // processing, but contains htlcs that need to be restored in memory.
1109
        // We replay this forwarding package to make sure our local mem state
1110
        // is resurrected, we mimic any original responses back to the remote
1111
        // party, and re-forward the relevant HTLCs to the switch.
1112

1113
        // If the package is fully acked but not completed, it must still have
1114
        // settles and fails to propagate.
1115
        if !fwdPkg.SettleFailFilter.IsFull() {
4✔
1116
                l.processRemoteSettleFails(fwdPkg)
2✔
1117
        }
2✔
1118

1119
        // Finally, replay *ALL ADDS* in this forwarding package. The
1120
        // downstream logic is able to filter out any duplicates, but we must
1121
        // shove the entire, original set of adds down the pipeline so that the
1122
        // batch of adds presented to the sphinx router does not ever change.
1123
        if !fwdPkg.AckFilter.IsFull() {
4✔
1124
                l.processRemoteAdds(fwdPkg)
2✔
1125

2✔
1126
                // If the link failed during processing the adds, we must
2✔
1127
                // return to ensure we won't attempted to update the state
2✔
1128
                // further.
2✔
1129
                if l.failed {
2✔
1130
                        return fmt.Errorf("link failed while " +
×
1131
                                "processing remote adds")
×
1132
                }
×
1133
        }
1134

1135
        return nil
2✔
1136
}
1137

1138
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1139
// removes those that can be discarded. It is safe to do this entirely in the
1140
// background, since all state is coordinated on disk. This also ensures the
1141
// link can continue to process messages and interleave database accesses.
1142
//
1143
// NOTE: This MUST be run as a goroutine.
1144
func (l *channelLink) fwdPkgGarbager() {
2✔
1145
        defer l.cg.WgDone()
2✔
1146

2✔
1147
        l.cfg.FwdPkgGCTicker.Resume()
2✔
1148
        defer l.cfg.FwdPkgGCTicker.Stop()
2✔
1149

2✔
1150
        if err := l.loadAndRemove(); err != nil {
2✔
UNCOV
1151
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
UNCOV
1152
        }
×
1153

1154
        for {
4✔
1155
                select {
2✔
UNCOV
1156
                case <-l.cfg.FwdPkgGCTicker.Ticks():
×
UNCOV
1157
                        if err := l.loadAndRemove(); err != nil {
×
UNCOV
1158
                                l.log.Warnf("unable to remove fwd pkgs: %v",
×
UNCOV
1159
                                        err)
×
UNCOV
1160
                                continue
×
1161
                        }
1162
                case <-l.cg.Done():
2✔
1163
                        return
2✔
1164
                }
1165
        }
1166
}
1167

1168
// loadAndRemove loads all the channels forwarding packages and determines if
1169
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1170
// a longer tick interval can be used.
1171
func (l *channelLink) loadAndRemove() error {
2✔
1172
        fwdPkgs, err := l.channel.LoadFwdPkgs()
2✔
1173
        if err != nil {
2✔
UNCOV
1174
                return err
×
UNCOV
1175
        }
×
1176

1177
        var removeHeights []uint64
2✔
1178
        for _, fwdPkg := range fwdPkgs {
4✔
1179
                if fwdPkg.State != channeldb.FwdStateCompleted {
4✔
1180
                        continue
2✔
1181
                }
1182

1183
                removeHeights = append(removeHeights, fwdPkg.Height)
2✔
1184
        }
1185

1186
        // If removeHeights is empty, return early so we don't use a db
1187
        // transaction.
1188
        if len(removeHeights) == 0 {
4✔
1189
                return nil
2✔
1190
        }
2✔
1191

1192
        return l.channel.RemoveFwdPkgs(removeHeights...)
2✔
1193
}
1194

1195
// handleChanSyncErr performs the error handling logic in the case where we
1196
// could not successfully syncChanStates with our channel peer.
1197
func (l *channelLink) handleChanSyncErr(err error) {
2✔
1198
        l.log.Warnf("error when syncing channel states: %v", err)
2✔
1199

2✔
1200
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
2✔
1201

2✔
1202
        switch {
2✔
1203
        case errors.Is(err, ErrLinkShuttingDown):
2✔
1204
                l.log.Debugf("unable to sync channel states, link is " +
2✔
1205
                        "shutting down")
2✔
1206
                return
2✔
1207

1208
        // We failed syncing the commit chains, probably because the remote has
1209
        // lost state. We should force close the channel.
1210
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
2✔
1211
                fallthrough
2✔
1212

1213
        // The remote sent us an invalid last commit secret, we should force
1214
        // close the channel.
1215
        // TODO(halseth): and permanently ban the peer?
1216
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
2✔
1217
                fallthrough
2✔
1218

1219
        // The remote sent us a commit point different from what they sent us
1220
        // before.
1221
        // TODO(halseth): ban peer?
1222
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
2✔
1223
                // We'll fail the link and tell the peer to force close the
2✔
1224
                // channel. Note that the database state is not updated here,
2✔
1225
                // but will be updated when the close transaction is ready to
2✔
1226
                // avoid that we go down before storing the transaction in the
2✔
1227
                // db.
2✔
1228
                l.failf(
2✔
1229
                        LinkFailureError{
2✔
1230
                                code:          ErrSyncError,
2✔
1231
                                FailureAction: LinkFailureForceClose,
2✔
1232
                        },
2✔
1233
                        "unable to synchronize channel states: %v", err,
2✔
1234
                )
2✔
1235

1236
        // We have lost state and cannot safely force close the channel. Fail
1237
        // the channel and wait for the remote to hopefully force close it. The
1238
        // remote has sent us its latest unrevoked commitment point, and we'll
1239
        // store it in the database, such that we can attempt to recover the
1240
        // funds if the remote force closes the channel.
1241
        case errors.As(err, &errDataLoss):
2✔
1242
                err := l.channel.MarkDataLoss(
2✔
1243
                        errDataLoss.CommitPoint,
2✔
1244
                )
2✔
1245
                if err != nil {
2✔
1246
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1247
                                err)
×
1248
                }
×
1249

1250
        // We determined the commit chains were not possible to sync. We
1251
        // cautiously fail the channel, but don't force close.
1252
        // TODO(halseth): can we safely force close in any cases where this
1253
        // error is returned?
1254
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1255
                if err := l.channel.MarkBorked(); err != nil {
×
1256
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1257
                }
×
1258

1259
        // Other, unspecified error.
1260
        default:
×
1261
        }
1262

1263
        l.failf(
2✔
1264
                LinkFailureError{
2✔
1265
                        code:          ErrRecoveryError,
2✔
1266
                        FailureAction: LinkFailureForceNone,
2✔
1267
                },
2✔
1268
                "unable to synchronize channel states: %v", err,
2✔
1269
        )
2✔
1270
}
1271

1272
// htlcManager is the primary goroutine which drives a channel's commitment
1273
// update state-machine in response to messages received via several channels.
1274
// This goroutine reads messages from the upstream (remote) peer, and also from
1275
// downstream channel managed by the channel link. In the event that an htlc
1276
// needs to be forwarded, then send-only forward handler is used which sends
1277
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1278
// all timeouts for any active HTLCs, manages the channel's revocation window,
1279
// and also the htlc trickle queue+timer for this active channels.
1280
//
1281
// NOTE: This MUST be run as a goroutine.
1282
func (l *channelLink) htlcManager(ctx context.Context) {
2✔
1283
        defer func() {
4✔
1284
                l.cfg.BatchTicker.Stop()
2✔
1285
                l.cg.WgDone()
2✔
1286
                l.log.Infof("exited")
2✔
1287
        }()
2✔
1288

1289
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
2✔
1290

2✔
1291
        // Notify any clients that the link is now in the switch via an
2✔
1292
        // ActiveLinkEvent. We'll also defer an inactive link notification for
2✔
1293
        // when the link exits to ensure that every active notification is
2✔
1294
        // matched by an inactive one.
2✔
1295
        l.cfg.NotifyActiveLink(l.ChannelPoint())
2✔
1296
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
2✔
1297

2✔
1298
        // If the link is not started for the first time, we need to take extra
2✔
1299
        // steps to resume its state.
2✔
1300
        err := l.resumeLink(ctx)
2✔
1301
        if err != nil {
4✔
1302
                l.log.Errorf("resuming link failed: %v", err)
2✔
1303
                return
2✔
1304
        }
2✔
1305

1306
        // Now that we've received both channel_ready and channel reestablish,
1307
        // we can go ahead and send the active channel notification. We'll also
1308
        // defer the inactive notification for when the link exits to ensure
1309
        // that every active notification is matched by an inactive one.
1310
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
2✔
1311
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
2✔
1312

2✔
1313
        for {
4✔
1314
                // We must always check if we failed at some point processing
2✔
1315
                // the last update before processing the next.
2✔
1316
                if l.failed {
4✔
1317
                        l.log.Errorf("link failed, exiting htlcManager")
2✔
1318
                        return
2✔
1319
                }
2✔
1320

1321
                // Pause or resume the batch ticker.
1322
                l.toggleBatchTicker()
2✔
1323

2✔
1324
                select {
2✔
1325
                // We have a new hook that needs to be run when we reach a clean
1326
                // channel state.
1327
                case hook := <-l.flushHooks.newTransients:
2✔
1328
                        if l.channel.IsChannelClean() {
4✔
1329
                                hook()
2✔
1330
                        } else {
4✔
1331
                                l.flushHooks.alloc(hook)
2✔
1332
                        }
2✔
1333

1334
                // We have a new hook that needs to be run when we have
1335
                // committed all of our updates.
1336
                case hook := <-l.outgoingCommitHooks.newTransients:
2✔
1337
                        if !l.channel.OweCommitment() {
4✔
1338
                                hook()
2✔
1339
                        } else {
2✔
UNCOV
1340
                                l.outgoingCommitHooks.alloc(hook)
×
UNCOV
1341
                        }
×
1342

1343
                // We have a new hook that needs to be run when our peer has
1344
                // committed all of their updates.
1345
                case hook := <-l.incomingCommitHooks.newTransients:
×
1346
                        if !l.channel.NeedCommitment() {
×
1347
                                hook()
×
1348
                        } else {
×
1349
                                l.incomingCommitHooks.alloc(hook)
×
1350
                        }
×
1351

1352
                // Our update fee timer has fired, so we'll check the network
1353
                // fee to see if we should adjust our commitment fee.
UNCOV
1354
                case <-l.updateFeeTimer.C:
×
UNCOV
1355
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
×
UNCOV
1356
                        err := l.handleUpdateFee(ctx)
×
UNCOV
1357
                        if err != nil {
×
1358
                                l.log.Errorf("failed to handle update fee: "+
×
1359
                                        "%v", err)
×
1360
                        }
×
1361

1362
                // The underlying channel has notified us of a unilateral close
1363
                // carried out by the remote peer. In the case of such an
1364
                // event, we'll wipe the channel state from the peer, and mark
1365
                // the contract as fully settled. Afterwards we can exit.
1366
                //
1367
                // TODO(roasbeef): add force closure? also breach?
1368
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
2✔
1369
                        l.log.Warnf("remote peer has closed on-chain")
2✔
1370

2✔
1371
                        // TODO(roasbeef): remove all together
2✔
1372
                        go func() {
4✔
1373
                                chanPoint := l.channel.ChannelPoint()
2✔
1374
                                l.cfg.Peer.WipeChannel(&chanPoint)
2✔
1375
                        }()
2✔
1376

1377
                        return
2✔
1378

1379
                case <-l.cfg.BatchTicker.Ticks():
2✔
1380
                        // Attempt to extend the remote commitment chain
2✔
1381
                        // including all the currently pending entries. If the
2✔
1382
                        // send was unsuccessful, then abandon the update,
2✔
1383
                        // waiting for the revocation window to open up.
2✔
1384
                        if !l.updateCommitTxOrFail(ctx) {
2✔
1385
                                return
×
1386
                        }
×
1387

UNCOV
1388
                case <-l.cfg.PendingCommitTicker.Ticks():
×
UNCOV
1389
                        l.failf(
×
UNCOV
1390
                                LinkFailureError{
×
UNCOV
1391
                                        code:          ErrRemoteUnresponsive,
×
UNCOV
1392
                                        FailureAction: LinkFailureDisconnect,
×
UNCOV
1393
                                },
×
UNCOV
1394
                                "unable to complete dance",
×
UNCOV
1395
                        )
×
UNCOV
1396
                        return
×
1397

1398
                // A message from the switch was just received. This indicates
1399
                // that the link is an intermediate hop in a multi-hop HTLC
1400
                // circuit.
1401
                case pkt := <-l.downstream:
2✔
1402
                        l.handleDownstreamPkt(ctx, pkt)
2✔
1403

1404
                // A message from the connected peer was just received. This
1405
                // indicates that we have a new incoming HTLC, either directly
1406
                // for us, or part of a multi-hop HTLC circuit.
1407
                case msg := <-l.upstream:
2✔
1408
                        l.handleUpstreamMsg(ctx, msg)
2✔
1409

1410
                // A htlc resolution is received. This means that we now have a
1411
                // resolution for a previously accepted htlc.
1412
                case hodlItem := <-l.hodlQueue.ChanOut():
2✔
1413
                        err := l.handleHtlcResolution(ctx, hodlItem)
2✔
1414
                        if err != nil {
2✔
UNCOV
1415
                                l.log.Errorf("failed to handle htlc "+
×
UNCOV
1416
                                        "resolution: %v", err)
×
UNCOV
1417
                        }
×
1418

1419
                // A user-initiated quiescence request is received. We now
1420
                // forward it to the quiescer.
1421
                case qReq := <-l.quiescenceReqs:
2✔
1422
                        err := l.handleQuiescenceReq(qReq)
2✔
1423
                        if err != nil {
2✔
1424
                                l.log.Errorf("failed handle quiescence "+
×
1425
                                        "req: %v", err)
×
1426
                        }
×
1427

1428
                case <-l.cg.Done():
2✔
1429
                        return
2✔
1430
                }
1431
        }
1432
}
1433

1434
// processHodlQueue processes a received htlc resolution and continues reading
1435
// from the hodl queue until no more resolutions remain. When this function
1436
// returns without an error, the commit tx should be updated.
1437
func (l *channelLink) processHodlQueue(ctx context.Context,
1438
        firstResolution invoices.HtlcResolution) error {
2✔
1439

2✔
1440
        // Try to read all waiting resolution messages, so that they can all be
2✔
1441
        // processed in a single commitment tx update.
2✔
1442
        htlcResolution := firstResolution
2✔
1443
loop:
2✔
1444
        for {
4✔
1445
                // Lookup all hodl htlcs that can be failed or settled with this event.
2✔
1446
                // The hodl htlc must be present in the map.
2✔
1447
                circuitKey := htlcResolution.CircuitKey()
2✔
1448
                hodlHtlc, ok := l.hodlMap[circuitKey]
2✔
1449
                if !ok {
2✔
1450
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1451
                }
×
1452

1453
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
2✔
1454
                        return err
×
1455
                }
×
1456

1457
                // Clean up hodl map.
1458
                delete(l.hodlMap, circuitKey)
2✔
1459

2✔
1460
                select {
2✔
1461
                case item := <-l.hodlQueue.ChanOut():
2✔
1462
                        htlcResolution = item.(invoices.HtlcResolution)
2✔
1463

1464
                // No need to process it if the link is broken.
1465
                case <-l.cg.Done():
×
1466
                        return ErrLinkShuttingDown
×
1467

1468
                default:
2✔
1469
                        break loop
2✔
1470
                }
1471
        }
1472

1473
        // Update the commitment tx.
1474
        if err := l.updateCommitTx(ctx); err != nil {
2✔
UNCOV
1475
                return err
×
UNCOV
1476
        }
×
1477

1478
        return nil
2✔
1479
}
1480

1481
// processHtlcResolution applies a received htlc resolution to the provided
1482
// htlc. When this function returns without an error, the commit tx should be
1483
// updated.
1484
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1485
        htlc hodlHtlc) error {
2✔
1486

2✔
1487
        circuitKey := resolution.CircuitKey()
2✔
1488

2✔
1489
        // Determine required action for the resolution based on the type of
2✔
1490
        // resolution we have received.
2✔
1491
        switch res := resolution.(type) {
2✔
1492
        // Settle htlcs that returned a settle resolution using the preimage
1493
        // in the resolution.
1494
        case *invoices.HtlcSettleResolution:
2✔
1495
                l.log.Debugf("received settle resolution for %v "+
2✔
1496
                        "with outcome: %v", circuitKey, res.Outcome)
2✔
1497

2✔
1498
                return l.settleHTLC(
2✔
1499
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
2✔
1500
                )
2✔
1501

1502
        // For htlc failures, we get the relevant failure message based
1503
        // on the failure resolution and then fail the htlc.
1504
        case *invoices.HtlcFailResolution:
2✔
1505
                l.log.Debugf("received cancel resolution for "+
2✔
1506
                        "%v with outcome: %v", circuitKey, res.Outcome)
2✔
1507

2✔
1508
                // Get the lnwire failure message based on the resolution
2✔
1509
                // result.
2✔
1510
                failure := getResolutionFailure(res, htlc.add.Amount)
2✔
1511

2✔
1512
                l.sendHTLCError(
2✔
1513
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
2✔
1514
                        true,
2✔
1515
                )
2✔
1516
                return nil
2✔
1517

1518
        // Fail if we do not get a settle of fail resolution, since we
1519
        // are only expecting to handle settles and fails.
1520
        default:
×
1521
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1522
                        resolution)
×
1523
        }
1524
}
1525

1526
// getResolutionFailure returns the wire message that a htlc resolution should
1527
// be failed with.
1528
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1529
        amount lnwire.MilliSatoshi) *LinkError {
2✔
1530

2✔
1531
        // If the resolution has been resolved as part of a MPP timeout,
2✔
1532
        // we need to fail the htlc with lnwire.FailMppTimeout.
2✔
1533
        if resolution.Outcome == invoices.ResultMppTimeout {
2✔
1534
                return NewDetailedLinkError(
×
1535
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1536
                )
×
1537
        }
×
1538

1539
        // If the htlc is not a MPP timeout, we fail it with
1540
        // FailIncorrectDetails. This error is sent for invoice payment
1541
        // failures such as underpayment/ expiry too soon and hodl invoices
1542
        // (which return FailIncorrectDetails to avoid leaking information).
1543
        incorrectDetails := lnwire.NewFailIncorrectDetails(
2✔
1544
                amount, uint32(resolution.AcceptHeight),
2✔
1545
        )
2✔
1546

2✔
1547
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
2✔
1548
}
1549

1550
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1551
// within the link's configuration that will be used to determine when the link
1552
// should propose an update to its commitment fee rate.
1553
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
2✔
1554
        lower := int64(l.cfg.MinUpdateTimeout)
2✔
1555
        upper := int64(l.cfg.MaxUpdateTimeout)
2✔
1556
        return time.Duration(prand.Int63n(upper-lower) + lower)
2✔
1557
}
2✔
1558

1559
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1560
// downstream HTLC Switch.
1561
func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context,
1562
        pkt *htlcPacket) error {
2✔
1563

2✔
1564
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
2✔
1565
        if !ok {
2✔
1566
                return errors.New("not an UpdateAddHTLC packet")
×
1567
        }
×
1568

1569
        // If we are flushing the link in the outgoing direction or we have
1570
        // already sent Stfu, then we can't add new htlcs to the link and we
1571
        // need to bounce it.
1572
        if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
2✔
1573
                l.mailBox.FailAdd(pkt)
×
1574

×
1575
                return NewDetailedLinkError(
×
1576
                        &lnwire.FailTemporaryChannelFailure{},
×
1577
                        OutgoingFailureLinkNotEligible,
×
1578
                )
×
1579
        }
×
1580

1581
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1582
        // arbitrary delays between the switch adding an ADD to the
1583
        // mailbox, and the HTLC being added to the commitment state.
1584
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
2✔
1585
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1586
                l.mailBox.AckPacket(pkt.inKey())
×
1587
                return nil
×
1588
        }
×
1589

1590
        // Check if we can add the HTLC here without exceededing the max fee
1591
        // exposure threshold.
1592
        if l.isOverexposedWithHtlc(htlc, false) {
2✔
UNCOV
1593
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
×
UNCOV
1594
                        "exposure exceeded")
×
UNCOV
1595

×
UNCOV
1596
                l.mailBox.FailAdd(pkt)
×
UNCOV
1597

×
UNCOV
1598
                return NewDetailedLinkError(
×
UNCOV
1599
                        lnwire.NewTemporaryChannelFailure(nil),
×
UNCOV
1600
                        OutgoingFailureDownstreamHtlcAdd,
×
UNCOV
1601
                )
×
UNCOV
1602
        }
×
1603

1604
        // A new payment has been initiated via the downstream channel,
1605
        // so we add the new HTLC to our local log, then update the
1606
        // commitment chains.
1607
        htlc.ChanID = l.ChanID()
2✔
1608
        openCircuitRef := pkt.inKey()
2✔
1609

2✔
1610
        // We enforce the fee buffer for the commitment transaction because
2✔
1611
        // we are in control of adding this htlc. Nothing has locked-in yet so
2✔
1612
        // we can securely enforce the fee buffer which is only relevant if we
2✔
1613
        // are the initiator of the channel.
2✔
1614
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
2✔
1615
        if err != nil {
4✔
1616
                // The HTLC was unable to be added to the state machine,
2✔
1617
                // as a result, we'll signal the switch to cancel the
2✔
1618
                // pending payment.
2✔
1619
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
2✔
1620
                        err)
2✔
1621

2✔
1622
                // Remove this packet from the link's mailbox, this
2✔
1623
                // prevents it from being reprocessed if the link
2✔
1624
                // restarts and resets it mailbox. If this response
2✔
1625
                // doesn't make it back to the originating link, it will
2✔
1626
                // be rejected upon attempting to reforward the Add to
2✔
1627
                // the switch, since the circuit was never fully opened,
2✔
1628
                // and the forwarding package shows it as
2✔
1629
                // unacknowledged.
2✔
1630
                l.mailBox.FailAdd(pkt)
2✔
1631

2✔
1632
                return NewDetailedLinkError(
2✔
1633
                        lnwire.NewTemporaryChannelFailure(nil),
2✔
1634
                        OutgoingFailureDownstreamHtlcAdd,
2✔
1635
                )
2✔
1636
        }
2✔
1637

1638
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
2✔
1639
                "local_log_index=%v, pend_updates=%v",
2✔
1640
                htlc.PaymentHash[:], index,
2✔
1641
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
2✔
1642

2✔
1643
        pkt.outgoingChanID = l.ShortChanID()
2✔
1644
        pkt.outgoingHTLCID = index
2✔
1645
        htlc.ID = index
2✔
1646

2✔
1647
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
2✔
1648
                pkt.inKey(), pkt.outKey())
2✔
1649

2✔
1650
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
2✔
1651
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
2✔
1652

2✔
1653
        err = l.cfg.Peer.SendMessage(false, htlc)
2✔
1654
        if err != nil {
2✔
1655
                l.log.Errorf("failed to send UpdateAddHTLC: %v", err)
×
1656
        }
×
1657

1658
        // Send a forward event notification to htlcNotifier.
1659
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
2✔
1660
                newHtlcKey(pkt),
2✔
1661
                HtlcInfo{
2✔
1662
                        IncomingTimeLock: pkt.incomingTimeout,
2✔
1663
                        IncomingAmt:      pkt.incomingAmount,
2✔
1664
                        OutgoingTimeLock: htlc.Expiry,
2✔
1665
                        OutgoingAmt:      htlc.Amount,
2✔
1666
                },
2✔
1667
                getEventType(pkt),
2✔
1668
        )
2✔
1669

2✔
1670
        l.tryBatchUpdateCommitTx(ctx)
2✔
1671

2✔
1672
        return nil
2✔
1673
}
1674

1675
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1676
// Switch. Possible messages sent by the switch include requests to forward new
1677
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1678
// cleared HTLCs with the upstream peer.
1679
//
1680
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1681
func (l *channelLink) handleDownstreamPkt(ctx context.Context,
1682
        pkt *htlcPacket) {
2✔
1683

2✔
1684
        if pkt.htlc.MsgType().IsChannelUpdate() &&
2✔
1685
                !l.quiescer.CanSendUpdates() {
2✔
1686

×
1687
                l.log.Warnf("unable to process channel update. "+
×
1688
                        "ChannelID=%v is quiescent.", l.ChanID)
×
1689

×
1690
                return
×
1691
        }
×
1692

1693
        switch htlc := pkt.htlc.(type) {
2✔
1694
        case *lnwire.UpdateAddHTLC:
2✔
1695
                // Handle add message. The returned error can be ignored,
2✔
1696
                // because it is also sent through the mailbox.
2✔
1697
                _ = l.handleDownstreamUpdateAdd(ctx, pkt)
2✔
1698

1699
        case *lnwire.UpdateFulfillHTLC:
2✔
1700
                l.processLocalUpdateFulfillHTLC(ctx, pkt, htlc)
2✔
1701

1702
        case *lnwire.UpdateFailHTLC:
2✔
1703
                l.processLocalUpdateFailHTLC(ctx, pkt, htlc)
2✔
1704
        }
1705
}
1706

1707
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1708
// full.
1709
func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) {
2✔
1710
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
2✔
1711
        if pending < uint64(l.cfg.BatchSize) {
4✔
1712
                return
2✔
1713
        }
2✔
1714

1715
        l.updateCommitTxOrFail(ctx)
2✔
1716
}
1717

1718
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1719
// associated with this packet. If successful in doing so, it will also purge
1720
// the open circuit from the circuit map and remove the packet from the link's
1721
// mailbox.
UNCOV
1722
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
×
UNCOV
1723
        inKey := pkt.inKey()
×
UNCOV
1724

×
UNCOV
1725
        l.log.Debugf("cleaning up spurious response for incoming "+
×
UNCOV
1726
                "circuit-key=%v", inKey)
×
UNCOV
1727

×
UNCOV
1728
        // If the htlc packet doesn't have a source reference, it is unsafe to
×
UNCOV
1729
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
×
UNCOV
1730
        if pkt.sourceRef == nil {
×
UNCOV
1731
                l.log.Errorf("unable to cleanup response for incoming "+
×
UNCOV
1732
                        "circuit-key=%v, does not contain source reference",
×
UNCOV
1733
                        inKey)
×
UNCOV
1734
                return
×
UNCOV
1735
        }
×
1736

1737
        // If the source reference is present,  we will try to prevent this link
1738
        // from resending the packet to the switch. To do so, we ack the AddRef
1739
        // of the incoming HTLC belonging to this link.
UNCOV
1740
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
×
UNCOV
1741
        if err != nil {
×
1742
                l.log.Errorf("unable to ack AddRef for incoming "+
×
1743
                        "circuit-key=%v: %v", inKey, err)
×
1744

×
1745
                // If this operation failed, it is unsafe to attempt removal of
×
1746
                // the destination reference or circuit, so we exit early. The
×
1747
                // cleanup may proceed with a different packet in the future
×
1748
                // that succeeds on this step.
×
1749
                return
×
1750
        }
×
1751

1752
        // Now that we know this link will stop retransmitting Adds to the
1753
        // switch, we can begin to teardown the response reference and circuit
1754
        // map.
1755
        //
1756
        // If the packet includes a destination reference, then a response for
1757
        // this HTLC was locked into the outgoing channel. Attempt to remove
1758
        // this reference, so we stop retransmitting the response internally.
1759
        // Even if this fails, we will proceed in trying to delete the circuit.
1760
        // When retransmitting responses, the destination references will be
1761
        // cleaned up if an open circuit is not found in the circuit map.
UNCOV
1762
        if pkt.destRef != nil {
×
1763
                err := l.channel.AckSettleFails(*pkt.destRef)
×
1764
                if err != nil {
×
1765
                        l.log.Errorf("unable to ack SettleFailRef "+
×
1766
                                "for incoming circuit-key=%v: %v",
×
1767
                                inKey, err)
×
1768
                }
×
1769
        }
1770

UNCOV
1771
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
×
UNCOV
1772

×
UNCOV
1773
        // With all known references acked, we can now safely delete the circuit
×
UNCOV
1774
        // from the switch's circuit map, as the state is no longer needed.
×
UNCOV
1775
        err = l.cfg.Circuits.DeleteCircuits(inKey)
×
UNCOV
1776
        if err != nil {
×
1777
                l.log.Errorf("unable to delete circuit for "+
×
1778
                        "circuit-key=%v: %v", inKey, err)
×
1779
        }
×
1780
}
1781

1782
// handleUpstreamMsg processes wire messages related to commitment state
1783
// updates from the upstream peer. The upstream peer is the peer whom we have a
1784
// direct channel with, updating our respective commitment chains.
1785
func (l *channelLink) handleUpstreamMsg(ctx context.Context,
1786
        msg lnwire.Message) {
2✔
1787

2✔
1788
        l.log.Tracef("receive upstream msg %v, handling now... ", msg.MsgType())
2✔
1789
        defer l.log.Tracef("handled upstream msg %v", msg.MsgType())
2✔
1790

2✔
1791
        // First check if the message is an update and we are capable of
2✔
1792
        // receiving updates right now.
2✔
1793
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
2✔
1794
                l.stfuFailf("update received after stfu: %T", msg)
×
1795
                return
×
1796
        }
×
1797

1798
        var err error
2✔
1799

2✔
1800
        switch msg := msg.(type) {
2✔
1801
        case *lnwire.UpdateAddHTLC:
2✔
1802
                err = l.processRemoteUpdateAddHTLC(msg)
2✔
1803

1804
        case *lnwire.UpdateFulfillHTLC:
2✔
1805
                err = l.processRemoteUpdateFulfillHTLC(msg)
2✔
1806

1807
        case *lnwire.UpdateFailMalformedHTLC:
2✔
1808
                err = l.processRemoteUpdateFailMalformedHTLC(msg)
2✔
1809

1810
        case *lnwire.UpdateFailHTLC:
2✔
1811
                err = l.processRemoteUpdateFailHTLC(msg)
2✔
1812

1813
        case *lnwire.CommitSig:
2✔
1814
                err = l.processRemoteCommitSig(ctx, msg)
2✔
1815

1816
        case *lnwire.RevokeAndAck:
2✔
1817
                err = l.processRemoteRevokeAndAck(ctx, msg)
2✔
1818

UNCOV
1819
        case *lnwire.UpdateFee:
×
UNCOV
1820
                err = l.processRemoteUpdateFee(msg)
×
1821

1822
        case *lnwire.Stfu:
2✔
1823
                err = l.handleStfu(msg)
2✔
1824
                if err != nil {
2✔
1825
                        l.stfuFailf("handleStfu: %v", err)
×
1826
                }
×
1827

1828
        // In the case where we receive a warning message from our peer, just
1829
        // log it and move on. We choose not to disconnect from our peer,
1830
        // although we "MAY" do so according to the specification.
UNCOV
1831
        case *lnwire.Warning:
×
UNCOV
1832
                l.log.Warnf("received warning message from peer: %v",
×
UNCOV
1833
                        msg.Warning())
×
1834

1835
        case *lnwire.Error:
1✔
1836
                l.processRemoteError(msg)
1✔
1837

1838
        default:
×
1839
                l.log.Warnf("received unknown message of type %T", msg)
×
1840
        }
1841

1842
        if err != nil {
4✔
1843
                l.log.Errorf("failed to process remote %v: %v", msg.MsgType(),
2✔
1844
                        err)
2✔
1845
        }
2✔
1846
}
1847

1848
// handleStfu implements the top-level logic for handling the Stfu message from
1849
// our peer.
1850
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
2✔
1851
        if !l.noDanglingUpdates(lntypes.Remote) {
2✔
1852
                return ErrPendingRemoteUpdates
×
1853
        }
×
1854
        err := l.quiescer.RecvStfu(*stfu)
2✔
1855
        if err != nil {
2✔
1856
                return err
×
1857
        }
×
1858

1859
        // If we can immediately send an Stfu response back, we will.
1860
        if l.noDanglingUpdates(lntypes.Local) {
4✔
1861
                return l.quiescer.SendOwedStfu()
2✔
1862
        }
2✔
1863

UNCOV
1864
        return nil
×
1865
}
1866

1867
// stfuFailf fails the link in the case where the requirements of the quiescence
1868
// protocol are violated. In all cases we opt to drop the connection as only
1869
// link state (as opposed to channel state) is affected.
1870
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
1871
        l.failf(LinkFailureError{
×
1872
                code:             ErrStfuViolation,
×
1873
                FailureAction:    LinkFailureDisconnect,
×
1874
                PermanentFailure: false,
×
1875
                Warning:          true,
×
1876
        }, format, args...)
×
1877
}
×
1878

1879
// noDanglingUpdates returns true when there are 0 updates that were originally
1880
// issued by whose on either the Local or Remote commitment transaction.
1881
func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
2✔
1882
        pendingOnLocal := l.channel.NumPendingUpdates(
2✔
1883
                whose, lntypes.Local,
2✔
1884
        )
2✔
1885
        pendingOnRemote := l.channel.NumPendingUpdates(
2✔
1886
                whose, lntypes.Remote,
2✔
1887
        )
2✔
1888

2✔
1889
        return pendingOnLocal == 0 && pendingOnRemote == 0
2✔
1890
}
2✔
1891

1892
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
1893
// for packets delivered from server, and cleaning up any circuits closed by
1894
// signing a previous commitment txn. This method ensures that the circuits are
1895
// removed from the circuit map before removing them from the link's mailbox,
1896
// otherwise it could be possible for some circuit to be missed if this link
1897
// flaps.
1898
func (l *channelLink) ackDownStreamPackets() error {
2✔
1899
        // First, remove the downstream Add packets that were included in the
2✔
1900
        // previous commitment signature. This will prevent the Adds from being
2✔
1901
        // replayed if this link disconnects.
2✔
1902
        for _, inKey := range l.openedCircuits {
4✔
1903
                // In order to test the sphinx replay logic of the remote
2✔
1904
                // party, unsafe replay does not acknowledge the packets from
2✔
1905
                // the mailbox. We can then force a replay of any Add packets
2✔
1906
                // held in memory by disconnecting and reconnecting the link.
2✔
1907
                if l.cfg.UnsafeReplay {
4✔
1908
                        continue
2✔
1909
                }
1910

1911
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
2✔
1912
                l.mailBox.AckPacket(inKey)
2✔
1913
        }
1914

1915
        // Now, we will delete all circuits closed by the previous commitment
1916
        // signature, which is the result of downstream Settle/Fail packets. We
1917
        // batch them here to ensure circuits are closed atomically and for
1918
        // performance.
1919
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
2✔
1920
        switch err {
2✔
1921
        case nil:
2✔
1922
                // Successful deletion.
1923

1924
        default:
×
1925
                l.log.Errorf("unable to delete %d circuits: %v",
×
1926
                        len(l.closedCircuits), err)
×
1927
                return err
×
1928
        }
1929

1930
        // With the circuits removed from memory and disk, we now ack any
1931
        // Settle/Fails in the mailbox to ensure they do not get redelivered
1932
        // after startup. If forgive is enabled and we've reached this point,
1933
        // the circuits must have been removed at some point, so it is now safe
1934
        // to un-queue the corresponding Settle/Fails.
1935
        for _, inKey := range l.closedCircuits {
4✔
1936
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
2✔
1937
                        inKey)
2✔
1938
                l.mailBox.AckPacket(inKey)
2✔
1939
        }
2✔
1940

1941
        // Lastly, reset our buffers to be empty while keeping any acquired
1942
        // growth in the backing array.
1943
        l.openedCircuits = l.openedCircuits[:0]
2✔
1944
        l.closedCircuits = l.closedCircuits[:0]
2✔
1945

2✔
1946
        return nil
2✔
1947
}
1948

1949
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
1950
// the link.
1951
func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool {
2✔
1952
        err := l.updateCommitTx(ctx)
2✔
1953
        switch {
2✔
1954
        // No error encountered, success.
1955
        case err == nil:
2✔
1956

1957
        // A duplicate keystone error should be resolved and is not fatal, so
1958
        // we won't send an Error message to the peer.
1959
        case errors.Is(err, ErrDuplicateKeystone):
×
1960
                l.failf(LinkFailureError{code: ErrCircuitError},
×
1961
                        "temporary circuit error: %v", err)
×
1962
                return false
×
1963

1964
        // Any other error is treated results in an Error message being sent to
1965
        // the peer.
UNCOV
1966
        default:
×
UNCOV
1967
                l.failf(LinkFailureError{code: ErrInternalError},
×
UNCOV
1968
                        "unable to update commitment: %v", err)
×
UNCOV
1969
                return false
×
1970
        }
1971

1972
        return true
2✔
1973
}
1974

1975
// updateCommitTx signs, then sends an update to the remote peer adding a new
1976
// commitment to their commitment chain which includes all the latest updates
1977
// we've received+processed up to this point.
1978
func (l *channelLink) updateCommitTx(ctx context.Context) error {
2✔
1979
        // Preemptively write all pending keystones to disk, just in case the
2✔
1980
        // HTLCs we have in memory are included in the subsequent attempt to
2✔
1981
        // sign a commitment state.
2✔
1982
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
2✔
1983
        if err != nil {
2✔
1984
                // If ErrDuplicateKeystone is returned, the caller will catch
×
1985
                // it.
×
1986
                return err
×
1987
        }
×
1988

1989
        // Reset the batch, but keep the backing buffer to avoid reallocating.
1990
        l.keystoneBatch = l.keystoneBatch[:0]
2✔
1991

2✔
1992
        // If hodl.Commit mode is active, we will refrain from attempting to
2✔
1993
        // commit any in-memory modifications to the channel state. Exiting here
2✔
1994
        // permits testing of either the switch or link's ability to trim
2✔
1995
        // circuits that have been opened, but unsuccessfully committed.
2✔
1996
        if l.cfg.HodlMask.Active(hodl.Commit) {
4✔
1997
                l.log.Warnf(hodl.Commit.Warning())
2✔
1998
                return nil
2✔
1999
        }
2✔
2000

2001
        ctx, done := l.cg.Create(ctx)
2✔
2002
        defer done()
2✔
2003

2✔
2004
        newCommit, err := l.channel.SignNextCommitment(ctx)
2✔
2005
        if err == lnwallet.ErrNoWindow {
4✔
2006
                l.cfg.PendingCommitTicker.Resume()
2✔
2007
                l.log.Trace("PendingCommitTicker resumed")
2✔
2008

2✔
2009
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
2✔
2010
                l.log.Tracef("revocation window exhausted, unable to send: "+
2✔
2011
                        "%v, pend_updates=%v, dangling_closes%v", n,
2✔
2012
                        lnutils.SpewLogClosure(l.openedCircuits),
2✔
2013
                        lnutils.SpewLogClosure(l.closedCircuits))
2✔
2014

2✔
2015
                return nil
2✔
2016
        } else if err != nil {
4✔
2017
                return err
×
2018
        }
×
2019

2020
        if err := l.ackDownStreamPackets(); err != nil {
2✔
2021
                return err
×
2022
        }
×
2023

2024
        l.cfg.PendingCommitTicker.Pause()
2✔
2025
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
2✔
2026

2✔
2027
        // The remote party now has a new pending commitment, so we'll update
2✔
2028
        // the contract court to be aware of this new set (the prior old remote
2✔
2029
        // pending).
2✔
2030
        newUpdate := &contractcourt.ContractUpdate{
2✔
2031
                HtlcKey: contractcourt.RemotePendingHtlcSet,
2✔
2032
                Htlcs:   newCommit.PendingHTLCs,
2✔
2033
        }
2✔
2034
        err = l.cfg.NotifyContractUpdate(newUpdate)
2✔
2035
        if err != nil {
2✔
2036
                l.log.Errorf("unable to notify contract update: %v", err)
×
2037
                return err
×
2038
        }
×
2039

2040
        select {
2✔
UNCOV
2041
        case <-l.cg.Done():
×
UNCOV
2042
                return ErrLinkShuttingDown
×
2043
        default:
2✔
2044
        }
2045

2046
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
2✔
2047
        if err != nil {
2✔
2048
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2049
        }
×
2050

2051
        commitSig := &lnwire.CommitSig{
2✔
2052
                ChanID:        l.ChanID(),
2✔
2053
                CommitSig:     newCommit.CommitSig,
2✔
2054
                HtlcSigs:      newCommit.HtlcSigs,
2✔
2055
                PartialSig:    newCommit.PartialSig,
2✔
2056
                CustomRecords: auxBlobRecords,
2✔
2057
        }
2✔
2058
        err = l.cfg.Peer.SendMessage(false, commitSig)
2✔
2059
        if err != nil {
2✔
2060
                l.log.Errorf("failed to send CommitSig: %v", err)
×
2061
        }
×
2062

2063
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
2064
        // of commit hooks.
2065
        l.RWMutex.Lock()
2✔
2066
        l.outgoingCommitHooks.invoke()
2✔
2067
        l.RWMutex.Unlock()
2✔
2068

2✔
2069
        return nil
2✔
2070
}
2071

2072
// Peer returns the representation of remote peer with which we have the
2073
// channel link opened.
2074
//
2075
// NOTE: Part of the ChannelLink interface.
2076
func (l *channelLink) PeerPubKey() [33]byte {
2✔
2077
        return l.cfg.Peer.PubKey()
2✔
2078
}
2✔
2079

2080
// ChannelPoint returns the channel outpoint for the channel link.
2081
// NOTE: Part of the ChannelLink interface.
2082
func (l *channelLink) ChannelPoint() wire.OutPoint {
2✔
2083
        return l.channel.ChannelPoint()
2✔
2084
}
2✔
2085

2086
// ShortChanID returns the short channel ID for the channel link. The short
2087
// channel ID encodes the exact location in the main chain that the original
2088
// funding output can be found.
2089
//
2090
// NOTE: Part of the ChannelLink interface.
2091
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
2✔
2092
        l.RLock()
2✔
2093
        defer l.RUnlock()
2✔
2094

2✔
2095
        return l.channel.ShortChanID()
2✔
2096
}
2✔
2097

2098
// UpdateShortChanID updates the short channel ID for a link. This may be
2099
// required in the event that a link is created before the short chan ID for it
2100
// is known, or a re-org occurs, and the funding transaction changes location
2101
// within the chain.
2102
//
2103
// NOTE: Part of the ChannelLink interface.
2104
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
2✔
2105
        chanID := l.ChanID()
2✔
2106

2✔
2107
        // Refresh the channel state's short channel ID by loading it from disk.
2✔
2108
        // This ensures that the channel state accurately reflects the updated
2✔
2109
        // short channel ID.
2✔
2110
        err := l.channel.State().Refresh()
2✔
2111
        if err != nil {
2✔
2112
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2113
                        "%v", chanID, err)
×
2114
                return hop.Source, err
×
2115
        }
×
2116

2117
        return hop.Source, nil
2✔
2118
}
2119

2120
// ChanID returns the channel ID for the channel link. The channel ID is a more
2121
// compact representation of a channel's full outpoint.
2122
//
2123
// NOTE: Part of the ChannelLink interface.
2124
func (l *channelLink) ChanID() lnwire.ChannelID {
2✔
2125
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
2✔
2126
}
2✔
2127

2128
// Bandwidth returns the total amount that can flow through the channel link at
2129
// this given instance. The value returned is expressed in millisatoshi and can
2130
// be used by callers when making forwarding decisions to determine if a link
2131
// can accept an HTLC.
2132
//
2133
// NOTE: Part of the ChannelLink interface.
2134
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
2✔
2135
        // Get the balance available on the channel for new HTLCs. This takes
2✔
2136
        // the channel reserve into account so HTLCs up to this value won't
2✔
2137
        // violate it.
2✔
2138
        return l.channel.AvailableBalance()
2✔
2139
}
2✔
2140

2141
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2142
// amount provided to the link. This check does not reserve a space, since
2143
// forwards or other payments may use the available slot, so it should be
2144
// considered best-effort.
2145
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
2✔
2146
        return l.channel.MayAddOutgoingHtlc(amt)
2✔
2147
}
2✔
2148

2149
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2150
// method.
2151
//
2152
// NOTE: Part of the dustHandler interface.
2153
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2154
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
2✔
2155

2✔
2156
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
2✔
2157
}
2✔
2158

2159
// getFeeRate is a wrapper method that retrieves the underlying channel's
2160
// feerate.
2161
//
2162
// NOTE: Part of the dustHandler interface.
2163
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
2✔
2164
        return l.channel.CommitFeeRate()
2✔
2165
}
2✔
2166

2167
// getDustClosure returns a closure that can be used by the switch or mailbox
2168
// to evaluate whether a given HTLC is dust.
2169
//
2170
// NOTE: Part of the dustHandler interface.
2171
func (l *channelLink) getDustClosure() dustClosure {
2✔
2172
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
2✔
2173
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
2✔
2174
        chanType := l.channel.State().ChanType
2✔
2175

2✔
2176
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
2✔
2177
}
2✔
2178

2179
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2180
// is used so that the Switch can have access to the commitment fee without
2181
// needing to have a *LightningChannel. This doesn't include dust.
2182
//
2183
// NOTE: Part of the dustHandler interface.
2184
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
2✔
2185
        if remote {
4✔
2186
                return l.channel.State().RemoteCommitment.CommitFee
2✔
2187
        }
2✔
2188

2189
        return l.channel.State().LocalCommitment.CommitFee
2✔
2190
}
2191

2192
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2193
// increases the total dust and fees within the channel past the configured
2194
// fee threshold. It first calculates the dust sum over every update in the
2195
// update log with the proposed fee-rate and taking into account both the local
2196
// and remote dust limits. It uses every update in the update log instead of
2197
// what is actually on the local and remote commitments because it is assumed
2198
// that in a worst-case scenario, every update in the update log could
2199
// theoretically be on either commitment transaction and this needs to be
2200
// accounted for with this fee-rate. It then calculates the local and remote
2201
// commitment fees given the proposed fee-rate. Finally, it tallies the results
2202
// and determines if the fee threshold has been exceeded.
2203
func (l *channelLink) exceedsFeeExposureLimit(
UNCOV
2204
        feePerKw chainfee.SatPerKWeight) (bool, error) {
×
UNCOV
2205

×
UNCOV
2206
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
×
UNCOV
2207

×
UNCOV
2208
        // Get the sum of dust for both the local and remote commitments using
×
UNCOV
2209
        // this "dry-run" fee.
×
UNCOV
2210
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
×
UNCOV
2211
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
×
UNCOV
2212

×
UNCOV
2213
        // Calculate the local and remote commitment fees using this dry-run
×
UNCOV
2214
        // fee.
×
UNCOV
2215
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
×
UNCOV
2216
        if err != nil {
×
2217
                return false, err
×
2218
        }
×
2219

2220
        // Finally, check whether the max fee exposure was exceeded on either
2221
        // future commitment transaction with the fee-rate.
UNCOV
2222
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
×
UNCOV
2223
        if totalLocalDust > l.cfg.MaxFeeExposure {
×
2224
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
2225
                        "local dust: %v, local fee: %v", l.ShortChanID(),
×
2226
                        totalLocalDust, localFee)
×
2227

×
2228
                return true, nil
×
2229
        }
×
2230

UNCOV
2231
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
×
UNCOV
2232
                remoteFee,
×
UNCOV
2233
        )
×
UNCOV
2234

×
UNCOV
2235
        if totalRemoteDust > l.cfg.MaxFeeExposure {
×
2236
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
2237
                        "remote dust: %v, remote fee: %v", l.ShortChanID(),
×
2238
                        totalRemoteDust, remoteFee)
×
2239

×
2240
                return true, nil
×
2241
        }
×
2242

UNCOV
2243
        return false, nil
×
2244
}
2245

2246
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
2247
// channel exceed the fee threshold. It first fetches the largest fee-rate that
2248
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
2249
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
2250
// the overall dust sum. If it is not dust, it contributes to weight, which
2251
// also adds to the overall dust sum by an increase in fees. If the dust sum on
2252
// either commitment exceeds the configured fee threshold, this function
2253
// returns true.
2254
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
2255
        incoming bool) bool {
2✔
2256

2✔
2257
        dustClosure := l.getDustClosure()
2✔
2258

2✔
2259
        feeRate := l.channel.WorstCaseFeeRate()
2✔
2260

2✔
2261
        amount := htlc.Amount.ToSatoshis()
2✔
2262

2✔
2263
        // See if this HTLC is dust on both the local and remote commitments.
2✔
2264
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
2✔
2265
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
2✔
2266

2✔
2267
        // Calculate the dust sum for the local and remote commitments.
2✔
2268
        localDustSum := l.getDustSum(
2✔
2269
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
2✔
2270
        )
2✔
2271
        remoteDustSum := l.getDustSum(
2✔
2272
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
2✔
2273
        )
2✔
2274

2✔
2275
        // Grab the larger of the local and remote commitment fees w/o dust.
2✔
2276
        commitFee := l.getCommitFee(false)
2✔
2277

2✔
2278
        if l.getCommitFee(true) > commitFee {
2✔
UNCOV
2279
                commitFee = l.getCommitFee(true)
×
UNCOV
2280
        }
×
2281

2282
        commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
2✔
2283

2✔
2284
        localDustSum += commitFeeMSat
2✔
2285
        remoteDustSum += commitFeeMSat
2✔
2286

2✔
2287
        // Calculate the additional fee increase if this is a non-dust HTLC.
2✔
2288
        weight := lntypes.WeightUnit(input.HTLCWeight)
2✔
2289
        additional := lnwire.NewMSatFromSatoshis(
2✔
2290
                feeRate.FeeForWeight(weight),
2✔
2291
        )
2✔
2292

2✔
2293
        if isLocalDust {
4✔
2294
                // If this is dust, it doesn't contribute to weight but does
2✔
2295
                // contribute to the overall dust sum.
2✔
2296
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
2✔
2297
        } else {
4✔
2298
                // Account for the fee increase that comes with an increase in
2✔
2299
                // weight.
2✔
2300
                localDustSum += additional
2✔
2301
        }
2✔
2302

2303
        if localDustSum > l.cfg.MaxFeeExposure {
2✔
UNCOV
2304
                // The max fee exposure was exceeded.
×
UNCOV
2305
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
UNCOV
2306
                        "overexposed, total local dust: %v (current commit "+
×
UNCOV
2307
                        "fee: %v)", l.ShortChanID(), htlc, localDustSum)
×
UNCOV
2308

×
UNCOV
2309
                return true
×
UNCOV
2310
        }
×
2311

2312
        if isRemoteDust {
4✔
2313
                // If this is dust, it doesn't contribute to weight but does
2✔
2314
                // contribute to the overall dust sum.
2✔
2315
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
2✔
2316
        } else {
4✔
2317
                // Account for the fee increase that comes with an increase in
2✔
2318
                // weight.
2✔
2319
                remoteDustSum += additional
2✔
2320
        }
2✔
2321

2322
        if remoteDustSum > l.cfg.MaxFeeExposure {
2✔
2323
                // The max fee exposure was exceeded.
×
2324
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
2325
                        "overexposed, total remote dust: %v (current commit "+
×
2326
                        "fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
×
2327

×
2328
                return true
×
2329
        }
×
2330

2331
        return false
2✔
2332
}
2333

2334
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
2335
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
2336
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
2337
// whether to evaluate on the local or remote commit, and finally an HTLC
2338
// amount to test.
2339
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
2340
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
2341

2342
// dustHelper is used to construct the dustClosure.
2343
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
2344
        remoteDustLimit btcutil.Amount) dustClosure {
2✔
2345

2✔
2346
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
2✔
2347
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
4✔
2348

2✔
2349
                var dustLimit btcutil.Amount
2✔
2350
                if whoseCommit.IsLocal() {
4✔
2351
                        dustLimit = localDustLimit
2✔
2352
                } else {
4✔
2353
                        dustLimit = remoteDustLimit
2✔
2354
                }
2✔
2355

2356
                return lnwallet.HtlcIsDust(
2✔
2357
                        chantype, incoming, whoseCommit, feerate, amt,
2✔
2358
                        dustLimit,
2✔
2359
                )
2✔
2360
        }
2361

2362
        return isDust
2✔
2363
}
2364

2365
// zeroConfConfirmed returns whether or not the zero-conf channel has
2366
// confirmed on-chain.
2367
//
2368
// Part of the scidAliasHandler interface.
2369
func (l *channelLink) zeroConfConfirmed() bool {
2✔
2370
        return l.channel.State().ZeroConfConfirmed()
2✔
2371
}
2✔
2372

2373
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
2374
// should not be called for non-zero-conf channels.
2375
//
2376
// Part of the scidAliasHandler interface.
2377
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
2✔
2378
        return l.channel.State().ZeroConfRealScid()
2✔
2379
}
2✔
2380

2381
// isZeroConf returns whether or not the underlying channel is a zero-conf
2382
// channel.
2383
//
2384
// Part of the scidAliasHandler interface.
2385
func (l *channelLink) isZeroConf() bool {
2✔
2386
        return l.channel.State().IsZeroConf()
2✔
2387
}
2✔
2388

2389
// negotiatedAliasFeature returns whether or not the underlying channel has
2390
// negotiated the option-scid-alias feature bit. This will be true for both
2391
// option-scid-alias and zero-conf channel-types. It will also be true for
2392
// channels with the feature bit but without the above channel-types.
2393
//
2394
// Part of the scidAliasFeature interface.
2395
func (l *channelLink) negotiatedAliasFeature() bool {
2✔
2396
        return l.channel.State().NegotiatedAliasFeature()
2✔
2397
}
2✔
2398

2399
// getAliases returns the set of aliases for the underlying channel.
2400
//
2401
// Part of the scidAliasHandler interface.
2402
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
2✔
2403
        return l.cfg.GetAliases(l.ShortChanID())
2✔
2404
}
2✔
2405

2406
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
2407
//
2408
// Part of the scidAliasHandler interface.
2409
func (l *channelLink) attachFailAliasUpdate(closure func(
2410
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
2✔
2411

2✔
2412
        l.Lock()
2✔
2413
        l.cfg.FailAliasUpdate = closure
2✔
2414
        l.Unlock()
2✔
2415
}
2✔
2416

2417
// AttachMailBox updates the current mailbox used by this link, and hooks up
2418
// the mailbox's message and packet outboxes to the link's upstream and
2419
// downstream chans, respectively.
2420
func (l *channelLink) AttachMailBox(mailbox MailBox) {
2✔
2421
        l.Lock()
2✔
2422
        l.mailBox = mailbox
2✔
2423
        l.upstream = mailbox.MessageOutBox()
2✔
2424
        l.downstream = mailbox.PacketOutBox()
2✔
2425
        l.Unlock()
2✔
2426

2✔
2427
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
2✔
2428
        // never committed.
2✔
2429
        l.mailBox.SetFeeRate(l.getFeeRate())
2✔
2430

2✔
2431
        // Also set the mailbox's dust closure so that it can query whether HTLC's
2✔
2432
        // are dust given the current feerate.
2✔
2433
        l.mailBox.SetDustClosure(l.getDustClosure())
2✔
2434
}
2✔
2435

2436
// UpdateForwardingPolicy updates the forwarding policy for the target
2437
// ChannelLink. Once updated, the link will use the new forwarding policy to
2438
// govern if it an incoming HTLC should be forwarded or not. We assume that
2439
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
2440
// update all of the link's FwrdingPolicy's values.
2441
//
2442
// NOTE: Part of the ChannelLink interface.
2443
func (l *channelLink) UpdateForwardingPolicy(
2444
        newPolicy models.ForwardingPolicy) {
2✔
2445

2✔
2446
        l.Lock()
2✔
2447
        defer l.Unlock()
2✔
2448

2✔
2449
        l.cfg.FwrdingPolicy = newPolicy
2✔
2450
}
2✔
2451

2452
// CheckHtlcForward should return a nil error if the passed HTLC details
2453
// satisfy the current forwarding policy fo the target link. Otherwise,
2454
// a LinkError with a valid protocol failure message should be returned
2455
// in order to signal to the source of the HTLC, the policy consistency
2456
// issue.
2457
//
2458
// NOTE: Part of the ChannelLink interface.
2459
func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
2460
        amtToForward lnwire.MilliSatoshi, incomingTimeout,
2461
        outgoingTimeout uint32, inboundFee models.InboundFee,
2462
        heightNow uint32, originalScid lnwire.ShortChannelID,
2463
        customRecords lnwire.CustomRecords) *LinkError {
2✔
2464

2✔
2465
        l.RLock()
2✔
2466
        policy := l.cfg.FwrdingPolicy
2✔
2467
        l.RUnlock()
2✔
2468

2✔
2469
        // Using the outgoing HTLC amount, we'll calculate the outgoing
2✔
2470
        // fee this incoming HTLC must carry in order to satisfy the constraints
2✔
2471
        // of the outgoing link.
2✔
2472
        outFee := ExpectedFee(policy, amtToForward)
2✔
2473

2✔
2474
        // Then calculate the inbound fee that we charge based on the sum of
2✔
2475
        // outgoing HTLC amount and outgoing fee.
2✔
2476
        inFee := inboundFee.CalcFee(amtToForward + outFee)
2✔
2477

2✔
2478
        // Add up both fee components. It is important to calculate both fees
2✔
2479
        // separately. An alternative way of calculating is to first determine
2✔
2480
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
2✔
2481
        // rounding may cause the result to be slightly higher than in the case
2✔
2482
        // of separately rounded fee components. This potentially causes failed
2✔
2483
        // forwards for senders and is something to be avoided.
2✔
2484
        expectedFee := inFee + int64(outFee)
2✔
2485

2✔
2486
        // If the actual fee is less than our expected fee, then we'll reject
2✔
2487
        // this HTLC as it didn't provide a sufficient amount of fees, or the
2✔
2488
        // values have been tampered with, or the send used incorrect/dated
2✔
2489
        // information to construct the forwarding information for this hop. In
2✔
2490
        // any case, we'll cancel this HTLC.
2✔
2491
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
2✔
2492
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
4✔
2493
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
2✔
2494
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
2✔
2495
                        "inboundFee=%v",
2✔
2496
                        payHash[:], expectedFee, actualFee,
2✔
2497
                        incomingHtlcAmt, amtToForward, inboundFee,
2✔
2498
                )
2✔
2499

2✔
2500
                // As part of the returned error, we'll send our latest routing
2✔
2501
                // policy so the sending node obtains the most up to date data.
2✔
2502
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
2503
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
2✔
2504
                }
2✔
2505
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
2506
                return NewLinkError(failure)
2✔
2507
        }
2508

2509
        // Check whether the outgoing htlc satisfies the channel policy.
2510
        err := l.canSendHtlc(
2✔
2511
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
2✔
2512
                originalScid, customRecords,
2✔
2513
        )
2✔
2514
        if err != nil {
4✔
2515
                return err
2✔
2516
        }
2✔
2517

2518
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
2519
        // the following constraint: the incoming time-lock minus our time-lock
2520
        // delta should equal the outgoing time lock. Otherwise, whether the
2521
        // sender messed up, or an intermediate node tampered with the HTLC.
2522
        timeDelta := policy.TimeLockDelta
2✔
2523
        if incomingTimeout < outgoingTimeout+timeDelta {
2✔
UNCOV
2524
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
×
UNCOV
2525
                        "expected at least %v block delta, got %v block delta",
×
UNCOV
2526
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
×
UNCOV
2527

×
UNCOV
2528
                // Grab the latest routing policy so the sending node is up to
×
UNCOV
2529
                // date with our current policy.
×
UNCOV
2530
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
×
UNCOV
2531
                        return lnwire.NewIncorrectCltvExpiry(
×
UNCOV
2532
                                incomingTimeout, *upd,
×
UNCOV
2533
                        )
×
UNCOV
2534
                }
×
UNCOV
2535
                failure := l.createFailureWithUpdate(false, originalScid, cb)
×
UNCOV
2536
                return NewLinkError(failure)
×
2537
        }
2538

2539
        return nil
2✔
2540
}
2541

2542
// CheckHtlcTransit should return a nil error if the passed HTLC details
2543
// satisfy the current channel policy.  Otherwise, a LinkError with a
2544
// valid protocol failure message should be returned in order to signal
2545
// the violation. This call is intended to be used for locally initiated
2546
// payments for which there is no corresponding incoming htlc.
2547
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
2548
        amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
2549
        customRecords lnwire.CustomRecords) *LinkError {
2✔
2550

2✔
2551
        l.RLock()
2✔
2552
        policy := l.cfg.FwrdingPolicy
2✔
2553
        l.RUnlock()
2✔
2554

2✔
2555
        // We pass in hop.Source here as this is only used in the Switch when
2✔
2556
        // trying to send over a local link. This causes the fallback mechanism
2✔
2557
        // to occur.
2✔
2558
        return l.canSendHtlc(
2✔
2559
                policy, payHash, amt, timeout, heightNow, hop.Source,
2✔
2560
                customRecords,
2✔
2561
        )
2✔
2562
}
2✔
2563

2564
// canSendHtlc checks whether the given htlc parameters satisfy
2565
// the channel's amount and time lock constraints.
2566
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
2567
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
2568
        heightNow uint32, originalScid lnwire.ShortChannelID,
2569
        customRecords lnwire.CustomRecords) *LinkError {
2✔
2570

2✔
2571
        // Validate HTLC amount against policy limits.
2✔
2572
        linkErr := l.validateHtlcAmount(
2✔
2573
                policy, payHash, amt, originalScid, customRecords,
2✔
2574
        )
2✔
2575
        if linkErr != nil {
4✔
2576
                return linkErr
2✔
2577
        }
2✔
2578

2579
        // We want to avoid offering an HTLC which will expire in the near
2580
        // future, so we'll reject an HTLC if the outgoing expiration time is
2581
        // too close to the current height.
2582
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
2✔
UNCOV
2583
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
×
UNCOV
2584
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
×
UNCOV
2585
                        timeout, heightNow)
×
UNCOV
2586

×
UNCOV
2587
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
×
UNCOV
2588
                        return lnwire.NewExpiryTooSoon(*upd)
×
UNCOV
2589
                }
×
UNCOV
2590
                failure := l.createFailureWithUpdate(false, originalScid, cb)
×
UNCOV
2591

×
UNCOV
2592
                return NewLinkError(failure)
×
2593
        }
2594

2595
        // Check absolute max delta.
2596
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
2✔
UNCOV
2597
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
×
UNCOV
2598
                        "the future: got %v, but maximum is %v", payHash[:],
×
UNCOV
2599
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
×
UNCOV
2600

×
UNCOV
2601
                return NewLinkError(&lnwire.FailExpiryTooFar{})
×
UNCOV
2602
        }
×
2603

2604
        // We now check the available bandwidth to see if this HTLC can be
2605
        // forwarded.
2606
        availableBandwidth := l.Bandwidth()
2✔
2607

2✔
2608
        auxBandwidth, externalErr := fn.MapOptionZ(
2✔
2609
                l.cfg.AuxTrafficShaper,
2✔
2610
                func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
2✔
2611
                        var htlcBlob fn.Option[tlv.Blob]
×
2612
                        blob, err := customRecords.Serialize()
×
2613
                        if err != nil {
×
2614
                                return fn.Err[OptionalBandwidth](
×
2615
                                        fmt.Errorf("unable to serialize "+
×
2616
                                                "custom records: %w", err))
×
2617
                        }
×
2618

2619
                        if len(blob) > 0 {
×
2620
                                htlcBlob = fn.Some(blob)
×
2621
                        }
×
2622

2623
                        return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
×
2624
                },
2625
        ).Unpack()
2626
        if externalErr != nil {
2✔
2627
                l.log.Errorf("Unable to determine aux bandwidth: %v",
×
2628
                        externalErr)
×
2629

×
2630
                return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
×
2631
        }
×
2632

2633
        if auxBandwidth.IsHandled && auxBandwidth.Bandwidth.IsSome() {
2✔
2634
                auxBandwidth.Bandwidth.WhenSome(
×
2635
                        func(bandwidth lnwire.MilliSatoshi) {
×
2636
                                availableBandwidth = bandwidth
×
2637
                        },
×
2638
                )
2639
        }
2640

2641
        // Check to see if there is enough balance in this channel.
2642
        if amt > availableBandwidth {
4✔
2643
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
2✔
2644
                        "larger than %v", amt, availableBandwidth)
2✔
2645
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
2646
                        return lnwire.NewTemporaryChannelFailure(upd)
2✔
2647
                }
2✔
2648
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
2649

2✔
2650
                return NewDetailedLinkError(
2✔
2651
                        failure, OutgoingFailureInsufficientBalance,
2✔
2652
                )
2✔
2653
        }
2654

2655
        return nil
2✔
2656
}
2657

2658
// AuxBandwidth returns the bandwidth that can be used for a channel, expressed
2659
// in milli-satoshi. This might be different from the regular BTC bandwidth for
2660
// custom channels. This will always return fn.None() for a regular (non-custom)
2661
// channel.
2662
func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
2663
        cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
2664
        ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
×
2665

×
2666
        fundingBlob := l.FundingCustomBlob()
×
2667
        shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob, htlcBlob)
×
2668
        if err != nil {
×
2669
                return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
×
2670
                        "failed to decide whether to handle traffic: %w", err))
×
2671
        }
×
2672

2673
        log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
×
2674
                "traffic: %v", cid, shouldHandle)
×
2675

×
2676
        // If this channel isn't handled by the aux traffic shaper, we'll return
×
2677
        // early.
×
2678
        if !shouldHandle {
×
2679
                return fn.Ok(OptionalBandwidth{
×
2680
                        IsHandled: false,
×
2681
                })
×
2682
        }
×
2683

2684
        peerBytes := l.cfg.Peer.PubKey()
×
2685

×
2686
        peer, err := route.NewVertexFromBytes(peerBytes[:])
×
2687
        if err != nil {
×
2688
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to decode "+
×
2689
                        "peer pub key: %v", err))
×
2690
        }
×
2691

2692
        // Ask for a specific bandwidth to be used for the channel.
2693
        commitmentBlob := l.CommitmentCustomBlob()
×
2694
        auxBandwidth, err := ts.PaymentBandwidth(
×
2695
                fundingBlob, htlcBlob, commitmentBlob, l.Bandwidth(), amount,
×
2696
                l.channel.FetchLatestAuxHTLCView(), peer,
×
2697
        )
×
2698
        if err != nil {
×
2699
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
×
2700
                        "bandwidth from external traffic shaper: %w", err))
×
2701
        }
×
2702

2703
        log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
×
2704
                "bandwidth: %v", cid, auxBandwidth)
×
2705

×
2706
        return fn.Ok(OptionalBandwidth{
×
2707
                IsHandled: true,
×
2708
                Bandwidth: fn.Some(auxBandwidth),
×
2709
        })
×
2710
}
2711

2712
// Stats returns the statistics of channel link.
2713
//
2714
// NOTE: Part of the ChannelLink interface.
2715
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
2✔
2716
        snapshot := l.channel.StateSnapshot()
2✔
2717

2✔
2718
        return snapshot.ChannelCommitment.CommitHeight,
2✔
2719
                snapshot.TotalMSatSent,
2✔
2720
                snapshot.TotalMSatReceived
2✔
2721
}
2✔
2722

2723
// String returns the string representation of channel link.
2724
//
2725
// NOTE: Part of the ChannelLink interface.
2726
func (l *channelLink) String() string {
×
2727
        return l.channel.ChannelPoint().String()
×
2728
}
×
2729

2730
// handleSwitchPacket handles the switch packets. This packets which might be
2731
// forwarded to us from another channel link in case the htlc update came from
2732
// another peer or if the update was created by user
2733
//
2734
// NOTE: Part of the packetHandler interface.
2735
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
2✔
2736
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
2✔
2737
                pkt.inKey(), pkt.outKey())
2✔
2738

2✔
2739
        return l.mailBox.AddPacket(pkt)
2✔
2740
}
2✔
2741

2742
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
2743
// to us from remote peer we have a channel with.
2744
//
2745
// NOTE: Part of the ChannelLink interface.
2746
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
2✔
2747
        select {
2✔
2748
        case <-l.cg.Done():
×
2749
                // Return early if the link is already in the process of
×
2750
                // quitting. It doesn't make sense to hand the message to the
×
2751
                // mailbox here.
×
2752
                return
×
2753
        default:
2✔
2754
        }
2755

2756
        err := l.mailBox.AddMessage(message)
2✔
2757
        if err != nil {
2✔
2758
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
2759
        }
×
2760
}
2761

2762
// updateChannelFee updates the commitment fee-per-kw on this channel by
2763
// committing to an update_fee message.
2764
func (l *channelLink) updateChannelFee(ctx context.Context,
UNCOV
2765
        feePerKw chainfee.SatPerKWeight) error {
×
UNCOV
2766

×
UNCOV
2767
        l.log.Infof("updating commit fee to %v", feePerKw)
×
UNCOV
2768

×
UNCOV
2769
        // We skip sending the UpdateFee message if the channel is not
×
UNCOV
2770
        // currently eligible to forward messages.
×
UNCOV
2771
        if !l.eligibleToUpdate() {
×
2772
                l.log.Debugf("skipping fee update for inactive channel")
×
2773
                return nil
×
2774
        }
×
2775

2776
        // Check and see if our proposed fee-rate would make us exceed the fee
2777
        // threshold.
UNCOV
2778
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
×
UNCOV
2779
        if err != nil {
×
2780
                // This shouldn't typically happen. If it does, it indicates
×
2781
                // something is wrong with our channel state.
×
2782
                return err
×
2783
        }
×
2784

UNCOV
2785
        if thresholdExceeded {
×
2786
                return fmt.Errorf("link fee threshold exceeded")
×
2787
        }
×
2788

2789
        // First, we'll update the local fee on our commitment.
UNCOV
2790
        if err := l.channel.UpdateFee(feePerKw); err != nil {
×
2791
                return err
×
2792
        }
×
2793

2794
        // The fee passed the channel's validation checks, so we update the
2795
        // mailbox feerate.
UNCOV
2796
        l.mailBox.SetFeeRate(feePerKw)
×
UNCOV
2797

×
UNCOV
2798
        // We'll then attempt to send a new UpdateFee message, and also lock it
×
UNCOV
2799
        // in immediately by triggering a commitment update.
×
UNCOV
2800
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
×
UNCOV
2801
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
×
2802
                return err
×
2803
        }
×
2804

UNCOV
2805
        return l.updateCommitTx(ctx)
×
2806
}
2807

2808
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
2809
// after receiving a revocation from the remote party, and reprocesses them in
2810
// the context of the provided forwarding package. Any settles or fails that
2811
// have already been acknowledged in the forwarding package will not be sent to
2812
// the switch.
2813
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
2✔
2814
        if len(fwdPkg.SettleFails) == 0 {
4✔
2815
                l.log.Trace("fwd package has no settle/fails to process " +
2✔
2816
                        "exiting early")
2✔
2817

2✔
2818
                return
2✔
2819
        }
2✔
2820

2821
        // Exit early if the fwdPkg is already processed.
2822
        if fwdPkg.State == channeldb.FwdStateCompleted {
2✔
2823
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
2824

×
2825
                return
×
2826
        }
×
2827

2828
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
2✔
2829

2✔
2830
        var switchPackets []*htlcPacket
2✔
2831
        for i, update := range fwdPkg.SettleFails {
4✔
2832
                destRef := fwdPkg.DestRef(uint16(i))
2✔
2833

2✔
2834
                // Skip any settles or fails that have already been
2✔
2835
                // acknowledged by the incoming link that originated the
2✔
2836
                // forwarded Add.
2✔
2837
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
2✔
2838
                        continue
×
2839
                }
2840

2841
                // TODO(roasbeef): rework log entries to a shared
2842
                // interface.
2843

2844
                switch msg := update.UpdateMsg.(type) {
2✔
2845
                // A settle for an HTLC we previously forwarded HTLC has been
2846
                // received. So we'll forward the HTLC to the switch which will
2847
                // handle propagating the settle to the prior hop.
2848
                case *lnwire.UpdateFulfillHTLC:
2✔
2849
                        // If hodl.SettleIncoming is requested, we will not
2✔
2850
                        // forward the SETTLE to the switch and will not signal
2✔
2851
                        // a free slot on the commitment transaction.
2✔
2852
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
2✔
2853
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
2854
                                continue
×
2855
                        }
2856

2857
                        settlePacket := &htlcPacket{
2✔
2858
                                outgoingChanID: l.ShortChanID(),
2✔
2859
                                outgoingHTLCID: msg.ID,
2✔
2860
                                destRef:        &destRef,
2✔
2861
                                htlc:           msg,
2✔
2862
                        }
2✔
2863

2✔
2864
                        // Add the packet to the batch to be forwarded, and
2✔
2865
                        // notify the overflow queue that a spare spot has been
2✔
2866
                        // freed up within the commitment state.
2✔
2867
                        switchPackets = append(switchPackets, settlePacket)
2✔
2868

2869
                // A failureCode message for a previously forwarded HTLC has
2870
                // been received. As a result a new slot will be freed up in
2871
                // our commitment state, so we'll forward this to the switch so
2872
                // the backwards undo can continue.
2873
                case *lnwire.UpdateFailHTLC:
2✔
2874
                        // If hodl.SettleIncoming is requested, we will not
2✔
2875
                        // forward the FAIL to the switch and will not signal a
2✔
2876
                        // free slot on the commitment transaction.
2✔
2877
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
2✔
2878
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
2879
                                continue
×
2880
                        }
2881

2882
                        // Fetch the reason the HTLC was canceled so we can
2883
                        // continue to propagate it. This failure originated
2884
                        // from another node, so the linkFailure field is not
2885
                        // set on the packet.
2886
                        failPacket := &htlcPacket{
2✔
2887
                                outgoingChanID: l.ShortChanID(),
2✔
2888
                                outgoingHTLCID: msg.ID,
2✔
2889
                                destRef:        &destRef,
2✔
2890
                                htlc:           msg,
2✔
2891
                        }
2✔
2892

2✔
2893
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
2✔
2894

2✔
2895
                        // If the failure message lacks an HMAC (but includes
2✔
2896
                        // the 4 bytes for encoding the message and padding
2✔
2897
                        // lengths, then this means that we received it as an
2✔
2898
                        // UpdateFailMalformedHTLC. As a result, we'll signal
2✔
2899
                        // that we need to convert this error within the switch
2✔
2900
                        // to an actual error, by encrypting it as if we were
2✔
2901
                        // the originating hop.
2✔
2902
                        convertedErrorSize := lnwire.FailureMessageLength + 4
2✔
2903
                        if len(msg.Reason) == convertedErrorSize {
4✔
2904
                                failPacket.convertedError = true
2✔
2905
                        }
2✔
2906

2907
                        // Add the packet to the batch to be forwarded, and
2908
                        // notify the overflow queue that a spare spot has been
2909
                        // freed up within the commitment state.
2910
                        switchPackets = append(switchPackets, failPacket)
2✔
2911
                }
2912
        }
2913

2914
        // Only spawn the task forward packets we have a non-zero number.
2915
        if len(switchPackets) > 0 {
4✔
2916
                go l.forwardBatch(false, switchPackets...)
2✔
2917
        }
2✔
2918
}
2919

2920
// processRemoteAdds serially processes each of the Add payment descriptors
2921
// which have been "locked-in" by receiving a revocation from the remote party.
2922
// The forwarding package provided instructs how to process this batch,
2923
// indicating whether this is the first time these Adds are being processed, or
2924
// whether we are reprocessing as a result of a failure or restart. Adds that
2925
// have already been acknowledged in the forwarding package will be ignored.
2926
//
2927
// NOTE: This function needs also be called for fwd packages with no ADDs
2928
// because it marks the fwdPkg as processed by writing the FwdFilter into the
2929
// database.
2930
//
2931
//nolint:funlen
2932
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
2✔
2933
        // Exit early if the fwdPkg is already processed.
2✔
2934
        if fwdPkg.State == channeldb.FwdStateCompleted {
2✔
2935
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
2936

×
2937
                return
×
2938
        }
×
2939

2940
        l.log.Tracef("processing %d remote adds for height %d",
2✔
2941
                len(fwdPkg.Adds), fwdPkg.Height)
2✔
2942

2✔
2943
        // decodeReqs is a list of requests sent to the onion decoder. We expect
2✔
2944
        // the same length of responses to be returned.
2✔
2945
        decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds))
2✔
2946

2✔
2947
        // unackedAdds is a list of ADDs that's waiting for the remote's
2✔
2948
        // settle/fail update.
2✔
2949
        unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds))
2✔
2950

2✔
2951
        for i, update := range fwdPkg.Adds {
4✔
2952
                // If this index is already found in the ack filter, the
2✔
2953
                // response to this forwarding decision has already been
2✔
2954
                // committed by one of our commitment txns. ADDs in this state
2✔
2955
                // are waiting for the rest of the fwding package to get acked
2✔
2956
                // before being garbage collected.
2✔
2957
                if fwdPkg.State == channeldb.FwdStateProcessed &&
2✔
2958
                        fwdPkg.AckFilter.Contains(uint16(i)) {
2✔
2959

×
2960
                        continue
×
2961
                }
2962

2963
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
4✔
2964
                        // Before adding the new htlc to the state machine,
2✔
2965
                        // parse the onion object in order to obtain the
2✔
2966
                        // routing information with DecodeHopIterator function
2✔
2967
                        // which process the Sphinx packet.
2✔
2968
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
2✔
2969

2✔
2970
                        req := hop.DecodeHopIteratorRequest{
2✔
2971
                                OnionReader:    onionReader,
2✔
2972
                                RHash:          msg.PaymentHash[:],
2✔
2973
                                IncomingCltv:   msg.Expiry,
2✔
2974
                                IncomingAmount: msg.Amount,
2✔
2975
                                BlindingPoint:  msg.BlindingPoint,
2✔
2976
                        }
2✔
2977

2✔
2978
                        decodeReqs = append(decodeReqs, req)
2✔
2979
                        unackedAdds = append(unackedAdds, msg)
2✔
2980
                }
2✔
2981
        }
2982

2983
        // If the fwdPkg has already been processed, it means we are
2984
        // reforwarding the packets again, which happens only on a restart.
2985
        reforward := fwdPkg.State == channeldb.FwdStateProcessed
2✔
2986

2✔
2987
        // Atomically decode the incoming htlcs, simultaneously checking for
2✔
2988
        // replay attempts. A particular index in the returned, spare list of
2✔
2989
        // channel iterators should only be used if the failure code at the
2✔
2990
        // same index is lnwire.FailCodeNone.
2✔
2991
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
2✔
2992
                fwdPkg.ID(), decodeReqs, reforward,
2✔
2993
        )
2✔
2994
        if sphinxErr != nil {
2✔
2995
                l.failf(LinkFailureError{code: ErrInternalError},
×
2996
                        "unable to decode hop iterators: %v", sphinxErr)
×
2997
                return
×
2998
        }
×
2999

3000
        var switchPackets []*htlcPacket
2✔
3001

2✔
3002
        for i, update := range unackedAdds {
4✔
3003
                idx := uint16(i)
2✔
3004
                sourceRef := fwdPkg.SourceRef(idx)
2✔
3005
                add := *update
2✔
3006

2✔
3007
                // An incoming HTLC add has been full-locked in. As a result we
2✔
3008
                // can now examine the forwarding details of the HTLC, and the
2✔
3009
                // HTLC itself to decide if: we should forward it, cancel it,
2✔
3010
                // or are able to settle it (and it adheres to our fee related
2✔
3011
                // constraints).
2✔
3012

2✔
3013
                // Before adding the new htlc to the state machine, parse the
2✔
3014
                // onion object in order to obtain the routing information with
2✔
3015
                // DecodeHopIterator function which process the Sphinx packet.
2✔
3016
                chanIterator, failureCode := decodeResps[i].Result()
2✔
3017
                if failureCode != lnwire.CodeNone {
4✔
3018
                        // If we're unable to process the onion blob then we
2✔
3019
                        // should send the malformed htlc error to payment
2✔
3020
                        // sender.
2✔
3021
                        l.sendMalformedHTLCError(
2✔
3022
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
2✔
3023
                        )
2✔
3024

2✔
3025
                        l.log.Errorf("unable to decode onion hop iterator "+
2✔
3026
                                "for htlc(id=%v, hash=%x): %v", add.ID,
2✔
3027
                                add.PaymentHash, failureCode)
2✔
3028

2✔
3029
                        continue
2✔
3030
                }
3031

3032
                heightNow := l.cfg.BestHeight()
2✔
3033

2✔
3034
                pld, routeRole, pldErr := chanIterator.HopPayload()
2✔
3035
                if pldErr != nil {
4✔
3036
                        // If we're unable to process the onion payload, or we
2✔
3037
                        // received invalid onion payload failure, then we
2✔
3038
                        // should send an error back to the caller so the HTLC
2✔
3039
                        // can be canceled.
2✔
3040
                        var failedType uint64
2✔
3041

2✔
3042
                        // We need to get the underlying error value, so we
2✔
3043
                        // can't use errors.As as suggested by the linter.
2✔
3044
                        //nolint:errorlint
2✔
3045
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
2✔
3046
                                failedType = uint64(e.Type)
×
3047
                        }
×
3048

3049
                        // If we couldn't parse the payload, make our best
3050
                        // effort at creating an error encrypter that knows
3051
                        // what blinding type we were, but if we couldn't
3052
                        // parse the payload we have no way of knowing whether
3053
                        // we were the introduction node or not.
3054
                        //
3055
                        //nolint:ll
3056
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
2✔
3057
                                l.cfg.ExtractErrorEncrypter,
2✔
3058
                                // We need our route role here because we
2✔
3059
                                // couldn't parse or validate the payload.
2✔
3060
                                routeRole == hop.RouteRoleIntroduction,
2✔
3061
                        )
2✔
3062
                        if failCode != lnwire.CodeNone {
2✔
3063
                                l.log.Errorf("could not extract error "+
×
3064
                                        "encrypter: %v", pldErr)
×
3065

×
3066
                                // We can't process this htlc, send back
×
3067
                                // malformed.
×
3068
                                l.sendMalformedHTLCError(
×
3069
                                        add.ID, failureCode, add.OnionBlob,
×
3070
                                        &sourceRef,
×
3071
                                )
×
3072

×
3073
                                continue
×
3074
                        }
3075

3076
                        // TODO: currently none of the test unit infrastructure
3077
                        // is setup to handle TLV payloads, so testing this
3078
                        // would require implementing a separate mock iterator
3079
                        // for TLV payloads that also supports injecting invalid
3080
                        // payloads. Deferring this non-trival effort till a
3081
                        // later date
3082
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
2✔
3083

2✔
3084
                        l.sendHTLCError(
2✔
3085
                                add, sourceRef, NewLinkError(failure),
2✔
3086
                                obfuscator, false,
2✔
3087
                        )
2✔
3088

2✔
3089
                        l.log.Errorf("unable to decode forwarding "+
2✔
3090
                                "instructions: %v", pldErr)
2✔
3091

2✔
3092
                        continue
2✔
3093
                }
3094

3095
                // Retrieve onion obfuscator from onion blob in order to
3096
                // produce initial obfuscation of the onion failureCode.
3097
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
2✔
3098
                        l.cfg.ExtractErrorEncrypter,
2✔
3099
                        routeRole == hop.RouteRoleIntroduction,
2✔
3100
                )
2✔
3101
                if failureCode != lnwire.CodeNone {
2✔
UNCOV
3102
                        // If we're unable to process the onion blob than we
×
UNCOV
3103
                        // should send the malformed htlc error to payment
×
UNCOV
3104
                        // sender.
×
UNCOV
3105
                        l.sendMalformedHTLCError(
×
UNCOV
3106
                                add.ID, failureCode, add.OnionBlob,
×
UNCOV
3107
                                &sourceRef,
×
UNCOV
3108
                        )
×
UNCOV
3109

×
UNCOV
3110
                        l.log.Errorf("unable to decode onion "+
×
UNCOV
3111
                                "obfuscator: %v", failureCode)
×
UNCOV
3112

×
UNCOV
3113
                        continue
×
3114
                }
3115

3116
                fwdInfo := pld.ForwardingInfo()
2✔
3117

2✔
3118
                // Check whether the payload we've just processed uses our
2✔
3119
                // node as the introduction point (gave us a blinding key in
2✔
3120
                // the payload itself) and fail it back if we don't support
2✔
3121
                // route blinding.
2✔
3122
                if fwdInfo.NextBlinding.IsSome() &&
2✔
3123
                        l.cfg.DisallowRouteBlinding {
4✔
3124

2✔
3125
                        failure := lnwire.NewInvalidBlinding(
2✔
3126
                                fn.Some(add.OnionBlob),
2✔
3127
                        )
2✔
3128

2✔
3129
                        l.sendHTLCError(
2✔
3130
                                add, sourceRef, NewLinkError(failure),
2✔
3131
                                obfuscator, false,
2✔
3132
                        )
2✔
3133

2✔
3134
                        l.log.Error("rejected htlc that uses use as an " +
2✔
3135
                                "introduction point when we do not support " +
2✔
3136
                                "route blinding")
2✔
3137

2✔
3138
                        continue
2✔
3139
                }
3140

3141
                switch fwdInfo.NextHop {
2✔
3142
                case hop.Exit:
2✔
3143
                        err := l.processExitHop(
2✔
3144
                                add, sourceRef, obfuscator, fwdInfo,
2✔
3145
                                heightNow, pld,
2✔
3146
                        )
2✔
3147
                        if err != nil {
2✔
3148
                                l.failf(LinkFailureError{
×
3149
                                        code: ErrInternalError,
×
3150
                                }, "%v", err)
×
3151

×
3152
                                return
×
3153
                        }
×
3154

3155
                // There are additional channels left within this route. So
3156
                // we'll simply do some forwarding package book-keeping.
3157
                default:
2✔
3158
                        // If hodl.AddIncoming is requested, we will not
2✔
3159
                        // validate the forwarded ADD, nor will we send the
2✔
3160
                        // packet to the htlc switch.
2✔
3161
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
2✔
3162
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3163
                                continue
×
3164
                        }
3165

3166
                        endorseValue := l.experimentalEndorsement(
2✔
3167
                                record.CustomSet(add.CustomRecords),
2✔
3168
                        )
2✔
3169
                        endorseType := uint64(
2✔
3170
                                lnwire.ExperimentalEndorsementType,
2✔
3171
                        )
2✔
3172

2✔
3173
                        switch fwdPkg.State {
2✔
3174
                        case channeldb.FwdStateProcessed:
2✔
3175
                                // This add was not forwarded on the previous
2✔
3176
                                // processing phase, run it through our
2✔
3177
                                // validation pipeline to reproduce an error.
2✔
3178
                                // This may trigger a different error due to
2✔
3179
                                // expiring timelocks, but we expect that an
2✔
3180
                                // error will be reproduced.
2✔
3181
                                if !fwdPkg.FwdFilter.Contains(idx) {
2✔
3182
                                        break
×
3183
                                }
3184

3185
                                // Otherwise, it was already processed, we can
3186
                                // can collect it and continue.
3187
                                outgoingAdd := &lnwire.UpdateAddHTLC{
2✔
3188
                                        Expiry:        fwdInfo.OutgoingCTLV,
2✔
3189
                                        Amount:        fwdInfo.AmountToForward,
2✔
3190
                                        PaymentHash:   add.PaymentHash,
2✔
3191
                                        BlindingPoint: fwdInfo.NextBlinding,
2✔
3192
                                }
2✔
3193

2✔
3194
                                endorseValue.WhenSome(func(e byte) {
4✔
3195
                                        custRecords := map[uint64][]byte{
2✔
3196
                                                endorseType: {e},
2✔
3197
                                        }
2✔
3198

2✔
3199
                                        outgoingAdd.CustomRecords = custRecords
2✔
3200
                                })
2✔
3201

3202
                                // Finally, we'll encode the onion packet for
3203
                                // the _next_ hop using the hop iterator
3204
                                // decoded for the current hop.
3205
                                buf := bytes.NewBuffer(
2✔
3206
                                        outgoingAdd.OnionBlob[0:0],
2✔
3207
                                )
2✔
3208

2✔
3209
                                // We know this cannot fail, as this ADD
2✔
3210
                                // was marked forwarded in a previous
2✔
3211
                                // round of processing.
2✔
3212
                                chanIterator.EncodeNextHop(buf)
2✔
3213

2✔
3214
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
2✔
3215

2✔
3216
                                //nolint:ll
2✔
3217
                                updatePacket := &htlcPacket{
2✔
3218
                                        incomingChanID:       l.ShortChanID(),
2✔
3219
                                        incomingHTLCID:       add.ID,
2✔
3220
                                        outgoingChanID:       fwdInfo.NextHop,
2✔
3221
                                        sourceRef:            &sourceRef,
2✔
3222
                                        incomingAmount:       add.Amount,
2✔
3223
                                        amount:               outgoingAdd.Amount,
2✔
3224
                                        htlc:                 outgoingAdd,
2✔
3225
                                        obfuscator:           obfuscator,
2✔
3226
                                        incomingTimeout:      add.Expiry,
2✔
3227
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
2✔
3228
                                        inOnionCustomRecords: pld.CustomRecords(),
2✔
3229
                                        inboundFee:           inboundFee,
2✔
3230
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
2✔
3231
                                }
2✔
3232
                                switchPackets = append(
2✔
3233
                                        switchPackets, updatePacket,
2✔
3234
                                )
2✔
3235

2✔
3236
                                continue
2✔
3237
                        }
3238

3239
                        // TODO(roasbeef): ensure don't accept outrageous
3240
                        // timeout for htlc
3241

3242
                        // With all our forwarding constraints met, we'll
3243
                        // create the outgoing HTLC using the parameters as
3244
                        // specified in the forwarding info.
3245
                        addMsg := &lnwire.UpdateAddHTLC{
2✔
3246
                                Expiry:        fwdInfo.OutgoingCTLV,
2✔
3247
                                Amount:        fwdInfo.AmountToForward,
2✔
3248
                                PaymentHash:   add.PaymentHash,
2✔
3249
                                BlindingPoint: fwdInfo.NextBlinding,
2✔
3250
                        }
2✔
3251

2✔
3252
                        endorseValue.WhenSome(func(e byte) {
4✔
3253
                                addMsg.CustomRecords = map[uint64][]byte{
2✔
3254
                                        endorseType: {e},
2✔
3255
                                }
2✔
3256
                        })
2✔
3257

3258
                        // Finally, we'll encode the onion packet for the
3259
                        // _next_ hop using the hop iterator decoded for the
3260
                        // current hop.
3261
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
2✔
3262
                        err := chanIterator.EncodeNextHop(buf)
2✔
3263
                        if err != nil {
2✔
3264
                                l.log.Errorf("unable to encode the "+
×
3265
                                        "remaining route %v", err)
×
3266

×
3267
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
×
3268
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
3269
                                }
×
3270

3271
                                failure := l.createFailureWithUpdate(
×
3272
                                        true, hop.Source, cb,
×
3273
                                )
×
3274

×
3275
                                l.sendHTLCError(
×
3276
                                        add, sourceRef, NewLinkError(failure),
×
3277
                                        obfuscator, false,
×
3278
                                )
×
3279
                                continue
×
3280
                        }
3281

3282
                        // Now that this add has been reprocessed, only append
3283
                        // it to our list of packets to forward to the switch
3284
                        // this is the first time processing the add. If the
3285
                        // fwd pkg has already been processed, then we entered
3286
                        // the above section to recreate a previous error.  If
3287
                        // the packet had previously been forwarded, it would
3288
                        // have been added to switchPackets at the top of this
3289
                        // section.
3290
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
4✔
3291
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
2✔
3292

2✔
3293
                                //nolint:ll
2✔
3294
                                updatePacket := &htlcPacket{
2✔
3295
                                        incomingChanID:       l.ShortChanID(),
2✔
3296
                                        incomingHTLCID:       add.ID,
2✔
3297
                                        outgoingChanID:       fwdInfo.NextHop,
2✔
3298
                                        sourceRef:            &sourceRef,
2✔
3299
                                        incomingAmount:       add.Amount,
2✔
3300
                                        amount:               addMsg.Amount,
2✔
3301
                                        htlc:                 addMsg,
2✔
3302
                                        obfuscator:           obfuscator,
2✔
3303
                                        incomingTimeout:      add.Expiry,
2✔
3304
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
2✔
3305
                                        inOnionCustomRecords: pld.CustomRecords(),
2✔
3306
                                        inboundFee:           inboundFee,
2✔
3307
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
2✔
3308
                                }
2✔
3309

2✔
3310
                                fwdPkg.FwdFilter.Set(idx)
2✔
3311
                                switchPackets = append(switchPackets,
2✔
3312
                                        updatePacket)
2✔
3313
                        }
2✔
3314
                }
3315
        }
3316

3317
        // Commit the htlcs we are intending to forward if this package has not
3318
        // been fully processed.
3319
        if fwdPkg.State == channeldb.FwdStateLockedIn {
4✔
3320
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
2✔
3321
                if err != nil {
2✔
3322
                        l.failf(LinkFailureError{code: ErrInternalError},
×
3323
                                "unable to set fwd filter: %v", err)
×
3324
                        return
×
3325
                }
×
3326
        }
3327

3328
        if len(switchPackets) == 0 {
4✔
3329
                return
2✔
3330
        }
2✔
3331

3332
        l.log.Debugf("forwarding %d packets to switch: reforward=%v",
2✔
3333
                len(switchPackets), reforward)
2✔
3334

2✔
3335
        // NOTE: This call is made synchronous so that we ensure all circuits
2✔
3336
        // are committed in the exact order that they are processed in the link.
2✔
3337
        // Failing to do this could cause reorderings/gaps in the range of
2✔
3338
        // opened circuits, which violates assumptions made by the circuit
2✔
3339
        // trimming.
2✔
3340
        l.forwardBatch(reforward, switchPackets...)
2✔
3341
}
3342

3343
// experimentalEndorsement returns the value to set for our outgoing
3344
// experimental endorsement field, and a boolean indicating whether it should
3345
// be populated on the outgoing htlc.
3346
func (l *channelLink) experimentalEndorsement(
3347
        customUpdateAdd record.CustomSet) fn.Option[byte] {
2✔
3348

2✔
3349
        // Only relay experimental signal if we are within the experiment
2✔
3350
        // period.
2✔
3351
        if !l.cfg.ShouldFwdExpEndorsement() {
4✔
3352
                return fn.None[byte]()
2✔
3353
        }
2✔
3354

3355
        // If we don't have any custom records or the experimental field is
3356
        // not set, just forward a zero value.
3357
        if len(customUpdateAdd) == 0 {
4✔
3358
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
2✔
3359
        }
2✔
3360

3361
        t := uint64(lnwire.ExperimentalEndorsementType)
2✔
3362
        value, set := customUpdateAdd[t]
2✔
3363
        if !set {
2✔
3364
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
3365
        }
×
3366

3367
        // We expect at least one byte for this field, consider it invalid if
3368
        // it has no data and just forward a zero value.
3369
        if len(value) == 0 {
2✔
3370
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
3371
        }
×
3372

3373
        // Only forward endorsed if the incoming link is endorsed.
3374
        if value[0] == lnwire.ExperimentalEndorsed {
4✔
3375
                return fn.Some[byte](lnwire.ExperimentalEndorsed)
2✔
3376
        }
2✔
3377

3378
        // Forward as unendorsed otherwise, including cases where we've
3379
        // received an invalid value that uses more than 3 bits of information.
3380
        return fn.Some[byte](lnwire.ExperimentalUnendorsed)
2✔
3381
}
3382

3383
// processExitHop handles an htlc for which this link is the exit hop. It
3384
// returns a boolean indicating whether the commitment tx needs an update.
3385
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
3386
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
3387
        fwdInfo hop.ForwardingInfo, heightNow uint32,
3388
        payload invoices.Payload) error {
2✔
3389

2✔
3390
        // If hodl.ExitSettle is requested, we will not validate the final hop's
2✔
3391
        // ADD, nor will we settle the corresponding invoice or respond with the
2✔
3392
        // preimage.
2✔
3393
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
4✔
3394
                l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
2✔
3395
                        hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
2✔
3396

2✔
3397
                return nil
2✔
3398
        }
2✔
3399

3400
        // In case the traffic shaper is active, we'll check if the HTLC has
3401
        // custom records and skip the amount check in the onion payload below.
3402
        isCustomHTLC := fn.MapOptionZ(
2✔
3403
                l.cfg.AuxTrafficShaper,
2✔
3404
                func(ts AuxTrafficShaper) bool {
2✔
3405
                        return ts.IsCustomHTLC(add.CustomRecords)
×
3406
                },
×
3407
        )
3408

3409
        // As we're the exit hop, we'll double check the hop-payload included in
3410
        // the HTLC to ensure that it was crafted correctly by the sender and
3411
        // is compatible with the HTLC we were extended. If an external
3412
        // validator is active we might bypass the amount check.
3413
        if !isCustomHTLC && add.Amount < fwdInfo.AmountToForward {
2✔
UNCOV
3414
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
×
UNCOV
3415
                        "incompatible value: expected <=%v, got %v",
×
UNCOV
3416
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
×
UNCOV
3417

×
UNCOV
3418
                failure := NewLinkError(
×
UNCOV
3419
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
×
UNCOV
3420
                )
×
UNCOV
3421
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
×
UNCOV
3422

×
UNCOV
3423
                return nil
×
UNCOV
3424
        }
×
3425

3426
        // We'll also ensure that our time-lock value has been computed
3427
        // correctly.
3428
        if add.Expiry < fwdInfo.OutgoingCTLV {
2✔
UNCOV
3429
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
×
UNCOV
3430
                        "incompatible time-lock: expected <=%v, got %v",
×
UNCOV
3431
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
×
UNCOV
3432

×
UNCOV
3433
                failure := NewLinkError(
×
UNCOV
3434
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
×
UNCOV
3435
                )
×
UNCOV
3436

×
UNCOV
3437
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
×
UNCOV
3438

×
UNCOV
3439
                return nil
×
UNCOV
3440
        }
×
3441

3442
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
3443
        // after this, this code will be re-executed after restart. We will
3444
        // receive back a resolution event.
3445
        invoiceHash := lntypes.Hash(add.PaymentHash)
2✔
3446

2✔
3447
        circuitKey := models.CircuitKey{
2✔
3448
                ChanID: l.ShortChanID(),
2✔
3449
                HtlcID: add.ID,
2✔
3450
        }
2✔
3451

2✔
3452
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
2✔
3453
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
2✔
3454
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
2✔
3455
        )
2✔
3456
        if err != nil {
2✔
3457
                return err
×
3458
        }
×
3459

3460
        // Create a hodlHtlc struct and decide either resolved now or later.
3461
        htlc := hodlHtlc{
2✔
3462
                add:        add,
2✔
3463
                sourceRef:  sourceRef,
2✔
3464
                obfuscator: obfuscator,
2✔
3465
        }
2✔
3466

2✔
3467
        // If the event is nil, the invoice is being held, so we save payment
2✔
3468
        // descriptor for future reference.
2✔
3469
        if event == nil {
4✔
3470
                l.hodlMap[circuitKey] = htlc
2✔
3471
                return nil
2✔
3472
        }
2✔
3473

3474
        // Process the received resolution.
3475
        return l.processHtlcResolution(event, htlc)
2✔
3476
}
3477

3478
// settleHTLC settles the HTLC on the channel.
3479
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
3480
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
2✔
3481

2✔
3482
        hash := preimage.Hash()
2✔
3483

2✔
3484
        l.log.Infof("settling htlc %v as exit hop", hash)
2✔
3485

2✔
3486
        err := l.channel.SettleHTLC(
2✔
3487
                preimage, htlcIndex, &sourceRef, nil, nil,
2✔
3488
        )
2✔
3489
        if err != nil {
2✔
3490
                return fmt.Errorf("unable to settle htlc: %w", err)
×
3491
        }
×
3492

3493
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
3494
        // fake one before sending it to the peer.
3495
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
4✔
3496
                l.log.Warnf(hodl.BogusSettle.Warning())
2✔
3497
                preimage = [32]byte{}
2✔
3498
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
2✔
3499
        }
2✔
3500

3501
        // HTLC was successfully settled locally send notification about it
3502
        // remote peer.
3503
        err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
2✔
3504
                ChanID:          l.ChanID(),
2✔
3505
                ID:              htlcIndex,
2✔
3506
                PaymentPreimage: preimage,
2✔
3507
        })
2✔
3508
        if err != nil {
2✔
3509
                l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
×
3510
        }
×
3511

3512
        // Once we have successfully settled the htlc, notify a settle event.
3513
        l.cfg.HtlcNotifier.NotifySettleEvent(
2✔
3514
                HtlcKey{
2✔
3515
                        IncomingCircuit: models.CircuitKey{
2✔
3516
                                ChanID: l.ShortChanID(),
2✔
3517
                                HtlcID: htlcIndex,
2✔
3518
                        },
2✔
3519
                },
2✔
3520
                preimage,
2✔
3521
                HtlcEventTypeReceive,
2✔
3522
        )
2✔
3523

2✔
3524
        return nil
2✔
3525
}
3526

3527
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
3528
// err chan for the individual responses. This method is intended to be spawned
3529
// as a goroutine so the responses can be handled in the background.
3530
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
2✔
3531
        // Don't forward packets for which we already have a response in our
2✔
3532
        // mailbox. This could happen if a packet fails and is buffered in the
2✔
3533
        // mailbox, and the incoming link flaps.
2✔
3534
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
2✔
3535
        for _, pkt := range packets {
4✔
3536
                if l.mailBox.HasPacket(pkt.inKey()) {
4✔
3537
                        continue
2✔
3538
                }
3539

3540
                filteredPkts = append(filteredPkts, pkt)
2✔
3541
        }
3542

3543
        err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...)
2✔
3544
        if err != nil {
2✔
UNCOV
3545
                log.Errorf("Unhandled error while reforwarding htlc "+
×
UNCOV
3546
                        "settle/fail over htlcswitch: %v", err)
×
UNCOV
3547
        }
×
3548
}
3549

3550
// sendHTLCError functions cancels HTLC and send cancel message back to the
3551
// peer from which HTLC was received.
3552
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
3553
        sourceRef channeldb.AddRef, failure *LinkError,
3554
        e hop.ErrorEncrypter, isReceive bool) {
2✔
3555

2✔
3556
        reason, err := e.EncryptFirstHop(failure.WireMessage())
2✔
3557
        if err != nil {
2✔
3558
                l.log.Errorf("unable to obfuscate error: %v", err)
×
3559
                return
×
3560
        }
×
3561

3562
        err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
2✔
3563
        if err != nil {
2✔
3564
                l.log.Errorf("unable cancel htlc: %v", err)
×
3565
                return
×
3566
        }
×
3567

3568
        // Send the appropriate failure message depending on whether we're
3569
        // in a blinded route or not.
3570
        if err := l.sendIncomingHTLCFailureMsg(
2✔
3571
                add.ID, e, reason,
2✔
3572
        ); err != nil {
2✔
3573
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
3574
                return
×
3575
        }
×
3576

3577
        // Notify a link failure on our incoming link. Outgoing htlc information
3578
        // is not available at this point, because we have not decrypted the
3579
        // onion, so it is excluded.
3580
        var eventType HtlcEventType
2✔
3581
        if isReceive {
4✔
3582
                eventType = HtlcEventTypeReceive
2✔
3583
        } else {
4✔
3584
                eventType = HtlcEventTypeForward
2✔
3585
        }
2✔
3586

3587
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
2✔
3588
                HtlcKey{
2✔
3589
                        IncomingCircuit: models.CircuitKey{
2✔
3590
                                ChanID: l.ShortChanID(),
2✔
3591
                                HtlcID: add.ID,
2✔
3592
                        },
2✔
3593
                },
2✔
3594
                HtlcInfo{
2✔
3595
                        IncomingTimeLock: add.Expiry,
2✔
3596
                        IncomingAmt:      add.Amount,
2✔
3597
                },
2✔
3598
                eventType,
2✔
3599
                failure,
2✔
3600
                true,
2✔
3601
        )
2✔
3602
}
3603

3604
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
3605
// peer from which the HTLC was received. This function is primarily used to
3606
// handle the special requirements of route blinding, specifically:
3607
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
3608
// - Introduction nodes should return regular HTLC failure messages.
3609
//
3610
// It accepts the original opaque failure, which will be used in the case
3611
// that we're not part of a blinded route and an error encrypter that'll be
3612
// used if we are the introduction node and need to present an error as if
3613
// we're the failing party.
3614
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
3615
        e hop.ErrorEncrypter,
3616
        originalFailure lnwire.OpaqueReason) error {
2✔
3617

2✔
3618
        var msg lnwire.Message
2✔
3619
        switch {
2✔
3620
        // Our circuit's error encrypter will be nil if this was a locally
3621
        // initiated payment. We can only hit a blinded error for a locally
3622
        // initiated payment if we allow ourselves to be picked as the
3623
        // introduction node for our own payments and in that case we
3624
        // shouldn't reach this code. To prevent the HTLC getting stuck,
3625
        // we fail it back and log an error.
3626
        // code.
3627
        case e == nil:
×
3628
                msg = &lnwire.UpdateFailHTLC{
×
3629
                        ChanID: l.ChanID(),
×
3630
                        ID:     htlcIndex,
×
3631
                        Reason: originalFailure,
×
3632
                }
×
3633

×
3634
                l.log.Errorf("Unexpected blinded failure when "+
×
3635
                        "we are the sending node, incoming htlc: %v(%v)",
×
3636
                        l.ShortChanID(), htlcIndex)
×
3637

3638
        // For cleartext hops (ie, non-blinded/normal) we don't need any
3639
        // transformation on the error message and can just send the original.
3640
        case !e.Type().IsBlinded():
2✔
3641
                msg = &lnwire.UpdateFailHTLC{
2✔
3642
                        ChanID: l.ChanID(),
2✔
3643
                        ID:     htlcIndex,
2✔
3644
                        Reason: originalFailure,
2✔
3645
                }
2✔
3646

3647
        // When we're the introduction node, we need to convert the error to
3648
        // a UpdateFailHTLC.
3649
        case e.Type() == hop.EncrypterTypeIntroduction:
2✔
3650
                l.log.Debugf("Introduction blinded node switching out failure "+
2✔
3651
                        "error: %v", htlcIndex)
2✔
3652

2✔
3653
                // The specification does not require that we set the onion
2✔
3654
                // blob.
2✔
3655
                failureMsg := lnwire.NewInvalidBlinding(
2✔
3656
                        fn.None[[lnwire.OnionPacketSize]byte](),
2✔
3657
                )
2✔
3658
                reason, err := e.EncryptFirstHop(failureMsg)
2✔
3659
                if err != nil {
2✔
3660
                        return err
×
3661
                }
×
3662

3663
                msg = &lnwire.UpdateFailHTLC{
2✔
3664
                        ChanID: l.ChanID(),
2✔
3665
                        ID:     htlcIndex,
2✔
3666
                        Reason: reason,
2✔
3667
                }
2✔
3668

3669
        // If we are a relaying node, we need to switch out any error that
3670
        // we've received to a malformed HTLC error.
3671
        case e.Type() == hop.EncrypterTypeRelaying:
2✔
3672
                l.log.Debugf("Relaying blinded node switching out malformed "+
2✔
3673
                        "error: %v", htlcIndex)
2✔
3674

2✔
3675
                msg = &lnwire.UpdateFailMalformedHTLC{
2✔
3676
                        ChanID:      l.ChanID(),
2✔
3677
                        ID:          htlcIndex,
2✔
3678
                        FailureCode: lnwire.CodeInvalidBlinding,
2✔
3679
                }
2✔
3680

3681
        default:
×
3682
                return fmt.Errorf("unexpected encrypter: %d", e)
×
3683
        }
3684

3685
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
2✔
3686
                l.log.Warnf("Send update fail failed: %v", err)
×
3687
        }
×
3688

3689
        return nil
2✔
3690
}
3691

3692
// sendMalformedHTLCError helper function which sends the malformed HTLC update
3693
// to the payment sender.
3694
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
3695
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
3696
        sourceRef *channeldb.AddRef) {
2✔
3697

2✔
3698
        shaOnionBlob := sha256.Sum256(onionBlob[:])
2✔
3699
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
2✔
3700
        if err != nil {
2✔
3701
                l.log.Errorf("unable cancel htlc: %v", err)
×
3702
                return
×
3703
        }
×
3704

3705
        err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
2✔
3706
                ChanID:       l.ChanID(),
2✔
3707
                ID:           htlcIndex,
2✔
3708
                ShaOnionBlob: shaOnionBlob,
2✔
3709
                FailureCode:  code,
2✔
3710
        })
2✔
3711
        if err != nil {
2✔
3712
                l.log.Errorf("failed to send UpdateFailMalformedHTLC: %v", err)
×
3713
        }
×
3714
}
3715

3716
// failf is a function which is used to encapsulate the action necessary for
3717
// properly failing the link. It takes a LinkFailureError, which will be passed
3718
// to the OnChannelFailure closure, in order for it to determine if we should
3719
// force close the channel, and if we should send an error message to the
3720
// remote peer.
3721
func (l *channelLink) failf(linkErr LinkFailureError, format string,
3722
        a ...interface{}) {
2✔
3723

2✔
3724
        reason := fmt.Errorf(format, a...)
2✔
3725

2✔
3726
        // Return if we have already notified about a failure.
2✔
3727
        if l.failed {
4✔
3728
                l.log.Warnf("ignoring link failure (%v), as link already "+
2✔
3729
                        "failed", reason)
2✔
3730
                return
2✔
3731
        }
2✔
3732

3733
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
2✔
3734

2✔
3735
        // Set failed, such that we won't process any more updates, and notify
2✔
3736
        // the peer about the failure.
2✔
3737
        l.failed = true
2✔
3738
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
2✔
3739
}
3740

3741
// FundingCustomBlob returns the custom funding blob of the channel that this
3742
// link is associated with. The funding blob represents static information about
3743
// the channel that was created at channel funding time.
3744
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
3745
        if l.channel == nil {
×
3746
                return fn.None[tlv.Blob]()
×
3747
        }
×
3748

3749
        if l.channel.State() == nil {
×
3750
                return fn.None[tlv.Blob]()
×
3751
        }
×
3752

3753
        return l.channel.State().CustomBlob
×
3754
}
3755

3756
// CommitmentCustomBlob returns the custom blob of the current local commitment
3757
// of the channel that this link is associated with.
3758
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
3759
        if l.channel == nil {
×
3760
                return fn.None[tlv.Blob]()
×
3761
        }
×
3762

3763
        return l.channel.LocalCommitmentBlob()
×
3764
}
3765

3766
// handleHtlcResolution takes an HTLC resolution and processes it by draining
3767
// the hodlQueue. Once processed, a commit_sig is sent to the remote to update
3768
// their commitment.
3769
func (l *channelLink) handleHtlcResolution(ctx context.Context,
3770
        hodlItem any) error {
2✔
3771

2✔
3772
        htlcResolution, ok := hodlItem.(invoices.HtlcResolution)
2✔
3773
        if !ok {
2✔
3774
                return fmt.Errorf("expect HtlcResolution, got %T", hodlItem)
×
3775
        }
×
3776

3777
        err := l.processHodlQueue(ctx, htlcResolution)
2✔
3778
        // No error, success.
2✔
3779
        if err == nil {
4✔
3780
                return nil
2✔
3781
        }
2✔
3782

UNCOV
3783
        switch {
×
3784
        // If the duplicate keystone error was encountered, fail back
3785
        // gracefully.
3786
        case errors.Is(err, ErrDuplicateKeystone):
×
3787
                l.failf(
×
3788
                        LinkFailureError{
×
3789
                                code: ErrCircuitError,
×
3790
                        },
×
3791
                        "process hodl queue: temporary circuit error: %v", err,
×
3792
                )
×
3793

3794
        // Send an Error message to the peer.
UNCOV
3795
        default:
×
UNCOV
3796
                l.failf(
×
UNCOV
3797
                        LinkFailureError{
×
UNCOV
3798
                                code: ErrInternalError,
×
UNCOV
3799
                        },
×
UNCOV
3800
                        "process hodl queue: unable to update commitment: %v",
×
UNCOV
3801
                        err,
×
UNCOV
3802
                )
×
3803
        }
3804

UNCOV
3805
        return err
×
3806
}
3807

3808
// handleQuiescenceReq takes a locally initialized (RPC) quiescence request and
3809
// forwards it to the quiescer for further processing.
3810
func (l *channelLink) handleQuiescenceReq(req StfuReq) error {
2✔
3811
        l.quiescer.InitStfu(req)
2✔
3812

2✔
3813
        if !l.noDanglingUpdates(lntypes.Local) {
2✔
3814
                return nil
×
3815
        }
×
3816

3817
        err := l.quiescer.SendOwedStfu()
2✔
3818
        if err != nil {
2✔
3819
                l.stfuFailf("SendOwedStfu: %v", err)
×
3820
                res := fn.Err[lntypes.ChannelParty](err)
×
3821
                req.Resolve(res)
×
3822
        }
×
3823

3824
        return err
2✔
3825
}
3826

3827
// handleUpdateFee is called whenever the `updateFeeTimer` ticks. It is used to
3828
// decide whether we should send an `update_fee` msg to update the commitment's
3829
// feerate.
UNCOV
3830
func (l *channelLink) handleUpdateFee(ctx context.Context) error {
×
UNCOV
3831
        // If we're not the initiator of the channel, we don't control the fees,
×
UNCOV
3832
        // so we can ignore this.
×
UNCOV
3833
        if !l.channel.IsInitiator() {
×
3834
                return nil
×
3835
        }
×
3836

3837
        // If we are the initiator, then we'll sample the current fee rate to
3838
        // get into the chain within 3 blocks.
UNCOV
3839
        netFee, err := l.sampleNetworkFee()
×
UNCOV
3840
        if err != nil {
×
3841
                return fmt.Errorf("unable to sample network fee: %w", err)
×
3842
        }
×
3843

UNCOV
3844
        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
×
UNCOV
3845

×
UNCOV
3846
        newCommitFee := l.channel.IdealCommitFeeRate(
×
UNCOV
3847
                netFee, minRelayFee,
×
UNCOV
3848
                l.cfg.MaxAnchorsCommitFeeRate,
×
UNCOV
3849
                l.cfg.MaxFeeAllocation,
×
UNCOV
3850
        )
×
UNCOV
3851

×
UNCOV
3852
        // We determine if we should adjust the commitment fee based on the
×
UNCOV
3853
        // current commitment fee, the suggested new commitment fee and the
×
UNCOV
3854
        // current minimum relay fee rate.
×
UNCOV
3855
        commitFee := l.channel.CommitFeeRate()
×
UNCOV
3856
        if !shouldAdjustCommitFee(newCommitFee, commitFee, minRelayFee) {
×
UNCOV
3857
                return nil
×
UNCOV
3858
        }
×
3859

3860
        // If we do, then we'll send a new UpdateFee message to the remote
3861
        // party, to be locked in with a new update.
UNCOV
3862
        err = l.updateChannelFee(ctx, newCommitFee)
×
UNCOV
3863
        if err != nil {
×
3864
                return fmt.Errorf("unable to update fee rate: %w", err)
×
3865
        }
×
3866

UNCOV
3867
        return nil
×
3868
}
3869

3870
// toggleBatchTicker checks whether we need to resume or pause the batch ticker.
3871
// When we have no pending updates, the ticker is paused, otherwise resumed.
3872
func (l *channelLink) toggleBatchTicker() {
2✔
3873
        // If the previous event resulted in a non-empty batch, resume the batch
2✔
3874
        // ticker so that it can be cleared. Otherwise pause the ticker to
2✔
3875
        // prevent waking up the htlcManager while the batch is empty.
2✔
3876
        numUpdates := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
2✔
3877
        if numUpdates > 0 {
4✔
3878
                l.cfg.BatchTicker.Resume()
2✔
3879
                l.log.Tracef("BatchTicker resumed, NumPendingUpdates(Local, "+
2✔
3880
                        "Remote)=%d", numUpdates)
2✔
3881

2✔
3882
                return
2✔
3883
        }
2✔
3884

3885
        l.cfg.BatchTicker.Pause()
2✔
3886
        l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" +
2✔
3887
                "(Local, Remote)")
2✔
3888
}
3889

3890
// resumeLink is called when starting a previous link. It will go through the
3891
// reestablishment protocol and reforwarding packets that are yet resolved.
3892
func (l *channelLink) resumeLink(ctx context.Context) error {
2✔
3893
        // If this isn't the first time that this channel link has been created,
2✔
3894
        // then we'll need to check to see if we need to re-synchronize state
2✔
3895
        // with the remote peer. settledHtlcs is a map of HTLC's that we
2✔
3896
        // re-settled as part of the channel state sync.
2✔
3897
        if l.cfg.SyncStates {
4✔
3898
                err := l.syncChanStates(ctx)
2✔
3899
                if err != nil {
4✔
3900
                        l.handleChanSyncErr(err)
2✔
3901

2✔
3902
                        return err
2✔
3903
                }
2✔
3904
        }
3905

3906
        // If a shutdown message has previously been sent on this link, then we
3907
        // need to make sure that we have disabled any HTLC adds on the outgoing
3908
        // direction of the link and that we re-resend the same shutdown message
3909
        // that we previously sent.
3910
        //
3911
        // TODO(yy): we should either move this to chanCloser, or move all
3912
        // shutdown handling logic to be managed by the link, but not a mixed of
3913
        // partial management by two subsystems.
3914
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
4✔
3915
                // Immediately disallow any new outgoing HTLCs.
2✔
3916
                if !l.DisableAdds(Outgoing) {
2✔
3917
                        l.log.Warnf("Outgoing link adds already disabled")
×
3918
                }
×
3919

3920
                // Re-send the shutdown message the peer. Since syncChanStates
3921
                // would have sent any outstanding CommitSig, it is fine for us
3922
                // to immediately queue the shutdown message now.
3923
                err := l.cfg.Peer.SendMessage(false, &shutdown)
2✔
3924
                if err != nil {
2✔
3925
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
3926
                }
×
3927
        })
3928

3929
        // We've successfully reestablished the channel, mark it as such to
3930
        // allow the switch to forward HTLCs in the outbound direction.
3931
        l.markReestablished()
2✔
3932

2✔
3933
        // With the channel states synced, we now reset the mailbox to ensure we
2✔
3934
        // start processing all unacked packets in order. This is done here to
2✔
3935
        // ensure that all acknowledgments that occur during channel
2✔
3936
        // resynchronization have taken affect, causing us only to pull unacked
2✔
3937
        // packets after starting to read from the downstream mailbox.
2✔
3938
        err := l.mailBox.ResetPackets()
2✔
3939
        if err != nil {
2✔
3940
                l.log.Errorf("failed to reset packets: %v", err)
×
3941
        }
×
3942

3943
        // If the channel is pending, there's no need to reforwarding packets.
3944
        if l.ShortChanID() == hop.Source {
2✔
3945
                return nil
×
3946
        }
×
3947

3948
        // After cleaning up any memory pertaining to incoming packets, we now
3949
        // replay our forwarding packages to handle any htlcs that can be
3950
        // processed locally, or need to be forwarded out to the switch. We will
3951
        // only attempt to resolve packages if our short chan id indicates that
3952
        // the channel is not pending, otherwise we should have no htlcs to
3953
        // reforward.
3954
        err = l.resolveFwdPkgs(ctx)
2✔
3955
        switch {
2✔
3956
        // No error was encountered, success.
3957
        case err == nil:
2✔
3958
                // With our link's in-memory state fully reconstructed, spawn a
2✔
3959
                // goroutine to manage the reclamation of disk space occupied by
2✔
3960
                // completed forwarding packages.
2✔
3961
                l.cg.WgAdd(1)
2✔
3962
                go l.fwdPkgGarbager()
2✔
3963

2✔
3964
                return nil
2✔
3965

3966
        // If the duplicate keystone error was encountered, we'll fail without
3967
        // sending an Error message to the peer.
3968
        case errors.Is(err, ErrDuplicateKeystone):
×
3969
                l.failf(LinkFailureError{code: ErrCircuitError},
×
3970
                        "temporary circuit error: %v", err)
×
3971

3972
        // A non-nil error was encountered, send an Error message to
3973
        // the peer.
UNCOV
3974
        default:
×
UNCOV
3975
                l.failf(LinkFailureError{code: ErrInternalError},
×
UNCOV
3976
                        "unable to resolve fwd pkgs: %v", err)
×
3977
        }
3978

UNCOV
3979
        return err
×
3980
}
3981

3982
// processRemoteUpdateAddHTLC takes an `UpdateAddHTLC` msg sent from the remote
3983
// and processes it.
3984
func (l *channelLink) processRemoteUpdateAddHTLC(
3985
        msg *lnwire.UpdateAddHTLC) error {
2✔
3986

2✔
3987
        if l.IsFlushing(Incoming) {
2✔
3988
                // This is forbidden by the protocol specification. The best
×
3989
                // chance we have to deal with this is to drop the connection.
×
3990
                // This should roll back the channel state to the last
×
3991
                // CommitSig. If the remote has already sent a CommitSig we
×
3992
                // haven't received yet, channel state will be re-synchronized
×
3993
                // with a ChannelReestablish message upon reconnection and the
×
3994
                // protocol state that caused us to flush the link will be
×
3995
                // rolled back. In the event that there was some
×
3996
                // non-deterministic behavior in the remote that caused them to
×
3997
                // violate the protocol, we have a decent shot at correcting it
×
3998
                // this way, since reconnecting will put us in the cleanest
×
3999
                // possible state to try again.
×
4000
                //
×
4001
                // In addition to the above, it is possible for us to hit this
×
4002
                // case in situations where we improperly handle message
×
4003
                // ordering due to concurrency choices. An issue has been filed
×
4004
                // to address this here:
×
4005
                // https://github.com/lightningnetwork/lnd/issues/8393
×
4006
                err := errors.New("received add while link is flushing")
×
4007
                l.failf(
×
4008
                        LinkFailureError{
×
4009
                                code:             ErrInvalidUpdate,
×
4010
                                FailureAction:    LinkFailureDisconnect,
×
4011
                                PermanentFailure: false,
×
4012
                                Warning:          true,
×
4013
                        }, "%v", err,
×
4014
                )
×
4015

×
4016
                return err
×
4017
        }
×
4018

4019
        // Disallow htlcs with blinding points set if we haven't enabled the
4020
        // feature. This saves us from having to process the onion at all, but
4021
        // will only catch blinded payments where we are a relaying node (as the
4022
        // blinding point will be in the payload when we're the introduction
4023
        // node).
4024
        if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
2✔
4025
                err := errors.New("blinding point included when route " +
×
4026
                        "blinding is disabled")
×
4027

×
4028
                l.failf(LinkFailureError{code: ErrInvalidUpdate}, "%v", err)
×
4029

×
4030
                return err
×
4031
        }
×
4032

4033
        // We have to check the limit here rather than later in the switch
4034
        // because the counterparty can keep sending HTLC's without sending a
4035
        // revoke. This would mean that the switch check would only occur later.
4036
        if l.isOverexposedWithHtlc(msg, true) {
2✔
4037
                err := errors.New("peer sent us an HTLC that exceeded our " +
×
4038
                        "max fee exposure")
×
4039
                l.failf(LinkFailureError{code: ErrInternalError}, "%v", err)
×
4040

×
4041
                return err
×
4042
        }
×
4043

4044
        // We just received an add request from an upstream peer, so we add it
4045
        // to our state machine, then add the HTLC to our "settle" list in the
4046
        // event that we know the preimage.
4047
        index, err := l.channel.ReceiveHTLC(msg)
2✔
4048
        if err != nil {
2✔
4049
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4050
                        "unable to handle upstream add HTLC: %v", err)
×
4051

×
4052
                return err
×
4053
        }
×
4054

4055
        l.log.Tracef("receive upstream htlc with payment hash(%x), "+
2✔
4056
                "assigning index: %v", msg.PaymentHash[:], index)
2✔
4057

2✔
4058
        return nil
2✔
4059
}
4060

4061
// processRemoteUpdateFulfillHTLC takes an `UpdateFulfillHTLC` msg sent from the
4062
// remote and processes it.
4063
func (l *channelLink) processRemoteUpdateFulfillHTLC(
4064
        msg *lnwire.UpdateFulfillHTLC) error {
2✔
4065

2✔
4066
        pre := msg.PaymentPreimage
2✔
4067
        idx := msg.ID
2✔
4068

2✔
4069
        // Before we pipeline the settle, we'll check the set of active htlc's
2✔
4070
        // to see if the related UpdateAddHTLC has been fully locked-in.
2✔
4071
        var lockedin bool
2✔
4072
        htlcs := l.channel.ActiveHtlcs()
2✔
4073
        for _, add := range htlcs {
4✔
4074
                // The HTLC will be outgoing and match idx.
2✔
4075
                if !add.Incoming && add.HtlcIndex == idx {
4✔
4076
                        lockedin = true
2✔
4077
                        break
2✔
4078
                }
4079
        }
4080

4081
        if !lockedin {
2✔
UNCOV
4082
                err := errors.New("unable to handle upstream settle")
×
UNCOV
4083
                l.failf(LinkFailureError{code: ErrInvalidUpdate}, "%v", err)
×
UNCOV
4084

×
UNCOV
4085
                return err
×
UNCOV
4086
        }
×
4087

4088
        if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
4✔
4089
                l.failf(
2✔
4090
                        LinkFailureError{
2✔
4091
                                code:          ErrInvalidUpdate,
2✔
4092
                                FailureAction: LinkFailureForceClose,
2✔
4093
                        },
2✔
4094
                        "unable to handle upstream settle HTLC: %v", err,
2✔
4095
                )
2✔
4096

2✔
4097
                return err
2✔
4098
        }
2✔
4099

4100
        settlePacket := &htlcPacket{
2✔
4101
                outgoingChanID: l.ShortChanID(),
2✔
4102
                outgoingHTLCID: idx,
2✔
4103
                htlc: &lnwire.UpdateFulfillHTLC{
2✔
4104
                        PaymentPreimage: pre,
2✔
4105
                },
2✔
4106
        }
2✔
4107

2✔
4108
        // Add the newly discovered preimage to our growing list of uncommitted
2✔
4109
        // preimage. These will be written to the witness cache just before
2✔
4110
        // accepting the next commitment signature from the remote peer.
2✔
4111
        l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
2✔
4112

2✔
4113
        // Pipeline this settle, send it to the switch.
2✔
4114
        go l.forwardBatch(false, settlePacket)
2✔
4115

2✔
4116
        return nil
2✔
4117
}
4118

4119
// processRemoteUpdateFailMalformedHTLC takes an `UpdateFailMalformedHTLC` msg
4120
// sent from the remote and processes it.
4121
func (l *channelLink) processRemoteUpdateFailMalformedHTLC(
4122
        msg *lnwire.UpdateFailMalformedHTLC) error {
2✔
4123

2✔
4124
        // Convert the failure type encoded within the HTLC fail message to the
2✔
4125
        // proper generic lnwire error code.
2✔
4126
        var failure lnwire.FailureMessage
2✔
4127
        switch msg.FailureCode {
2✔
4128
        case lnwire.CodeInvalidOnionVersion:
2✔
4129
                failure = &lnwire.FailInvalidOnionVersion{
2✔
4130
                        OnionSHA256: msg.ShaOnionBlob,
2✔
4131
                }
2✔
4132
        case lnwire.CodeInvalidOnionHmac:
×
4133
                failure = &lnwire.FailInvalidOnionHmac{
×
4134
                        OnionSHA256: msg.ShaOnionBlob,
×
4135
                }
×
4136

4137
        case lnwire.CodeInvalidOnionKey:
×
4138
                failure = &lnwire.FailInvalidOnionKey{
×
4139
                        OnionSHA256: msg.ShaOnionBlob,
×
4140
                }
×
4141

4142
        // Handle malformed errors that are part of a blinded route. This case
4143
        // is slightly different, because we expect every relaying node in the
4144
        // blinded portion of the route to send malformed errors. If we're also
4145
        // a relaying node, we're likely going to switch this error out anyway
4146
        // for our own malformed error, but we handle the case here for
4147
        // completeness.
4148
        case lnwire.CodeInvalidBlinding:
2✔
4149
                failure = &lnwire.FailInvalidBlinding{
2✔
4150
                        OnionSHA256: msg.ShaOnionBlob,
2✔
4151
                }
2✔
4152

UNCOV
4153
        default:
×
UNCOV
4154
                l.log.Warnf("unexpected failure code received in "+
×
UNCOV
4155
                        "UpdateFailMailformedHTLC: %v", msg.FailureCode)
×
UNCOV
4156

×
UNCOV
4157
                // We don't just pass back the error we received from our
×
UNCOV
4158
                // successor. Otherwise we might report a failure that penalizes
×
UNCOV
4159
                // us more than needed. If the onion that we forwarded was
×
UNCOV
4160
                // correct, the node should have been able to send back its own
×
UNCOV
4161
                // failure. The node did not send back its own failure, so we
×
UNCOV
4162
                // assume there was a problem with the onion and report that
×
UNCOV
4163
                // back. We reuse the invalid onion key failure because there is
×
UNCOV
4164
                // no specific error for this case.
×
UNCOV
4165
                failure = &lnwire.FailInvalidOnionKey{
×
UNCOV
4166
                        OnionSHA256: msg.ShaOnionBlob,
×
UNCOV
4167
                }
×
4168
        }
4169

4170
        // With the error parsed, we'll convert the into it's opaque form.
4171
        var b bytes.Buffer
2✔
4172
        if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
2✔
4173
                return fmt.Errorf("unable to encode malformed error: %w", err)
×
4174
        }
×
4175

4176
        // If remote side have been unable to parse the onion blob we have sent
4177
        // to it, than we should transform the malformed HTLC message to the
4178
        // usual HTLC fail message.
4179
        err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
2✔
4180
        if err != nil {
2✔
4181
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4182
                        "unable to handle upstream fail HTLC: %v", err)
×
4183

×
4184
                return err
×
4185
        }
×
4186

4187
        return nil
2✔
4188
}
4189

4190
// processRemoteUpdateFailHTLC takes an `UpdateFailHTLC` msg sent from the
4191
// remote and processes it.
4192
func (l *channelLink) processRemoteUpdateFailHTLC(
4193
        msg *lnwire.UpdateFailHTLC) error {
2✔
4194

2✔
4195
        // Verify that the failure reason is at least 256 bytes plus overhead.
2✔
4196
        const minimumFailReasonLength = lnwire.FailureMessageLength + 2 + 2 + 32
2✔
4197

2✔
4198
        if len(msg.Reason) < minimumFailReasonLength {
2✔
UNCOV
4199
                // We've received a reason with a non-compliant length. Older
×
UNCOV
4200
                // nodes happily relay back these failures that may originate
×
UNCOV
4201
                // from a node further downstream. Therefore we can't just fail
×
UNCOV
4202
                // the channel.
×
UNCOV
4203
                //
×
UNCOV
4204
                // We want to be compliant ourselves, so we also can't pass back
×
UNCOV
4205
                // the reason unmodified. And we must make sure that we don't
×
UNCOV
4206
                // hit the magic length check of 260 bytes in
×
UNCOV
4207
                // processRemoteSettleFails either.
×
UNCOV
4208
                //
×
UNCOV
4209
                // Because the reason is unreadable for the payer anyway, we
×
UNCOV
4210
                // just replace it by a compliant-length series of random bytes.
×
UNCOV
4211
                msg.Reason = make([]byte, minimumFailReasonLength)
×
UNCOV
4212
                _, err := crand.Read(msg.Reason[:])
×
UNCOV
4213
                if err != nil {
×
4214
                        return fmt.Errorf("random generation error: %w", err)
×
4215
                }
×
4216
        }
4217

4218
        // Add fail to the update log.
4219
        idx := msg.ID
2✔
4220
        err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
2✔
4221
        if err != nil {
2✔
4222
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4223
                        "unable to handle upstream fail HTLC: %v", err)
×
4224

×
4225
                return err
×
4226
        }
×
4227

4228
        return nil
2✔
4229
}
4230

4231
// processRemoteCommitSig takes a `CommitSig` msg sent from the remote and
4232
// processes it.
4233
func (l *channelLink) processRemoteCommitSig(ctx context.Context,
4234
        msg *lnwire.CommitSig) error {
2✔
4235

2✔
4236
        // Since we may have learned new preimages for the first time, we'll add
2✔
4237
        // them to our preimage cache. By doing this, we ensure any contested
2✔
4238
        // contracts watched by any on-chain arbitrators can now sweep this HTLC
2✔
4239
        // on-chain. We delay committing the preimages until just before
2✔
4240
        // accepting the new remote commitment, as afterwards the peer won't
2✔
4241
        // resend the Settle messages on the next channel reestablishment. Doing
2✔
4242
        // so allows us to more effectively batch this operation, instead of
2✔
4243
        // doing a single write per preimage.
2✔
4244
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
2✔
4245
        if err != nil {
2✔
4246
                l.failf(
×
4247
                        LinkFailureError{code: ErrInternalError},
×
4248
                        "unable to add preimages=%v to cache: %v",
×
4249
                        l.uncommittedPreimages, err,
×
4250
                )
×
4251

×
4252
                return err
×
4253
        }
×
4254

4255
        // Instead of truncating the slice to conserve memory allocations, we
4256
        // simply set the uncommitted preimage slice to nil so that a new one
4257
        // will be initialized if any more witnesses are discovered. We do this
4258
        // because the maximum size that the slice can occupy is 15KB, and we
4259
        // want to ensure we release that memory back to the runtime.
4260
        l.uncommittedPreimages = nil
2✔
4261

2✔
4262
        // We just received a new updates to our local commitment chain,
2✔
4263
        // validate this new commitment, closing the link if invalid.
2✔
4264
        auxSigBlob, err := msg.CustomRecords.Serialize()
2✔
4265
        if err != nil {
2✔
4266
                l.failf(
×
4267
                        LinkFailureError{code: ErrInvalidCommitment},
×
4268
                        "unable to serialize custom records: %v", err,
×
4269
                )
×
4270

×
4271
                return err
×
4272
        }
×
4273
        err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
2✔
4274
                CommitSig:  msg.CommitSig,
2✔
4275
                HtlcSigs:   msg.HtlcSigs,
2✔
4276
                PartialSig: msg.PartialSig,
2✔
4277
                AuxSigBlob: auxSigBlob,
2✔
4278
        })
2✔
4279
        if err != nil {
2✔
4280
                // If we were unable to reconstruct their proposed commitment,
×
4281
                // then we'll examine the type of error. If it's an
×
4282
                // InvalidCommitSigError, then we'll send a direct error.
×
4283
                var sendData []byte
×
4284
                switch {
×
4285
                case lnutils.ErrorAs[*lnwallet.InvalidCommitSigError](err):
×
4286
                        sendData = []byte(err.Error())
×
4287
                case lnutils.ErrorAs[*lnwallet.InvalidHtlcSigError](err):
×
4288
                        sendData = []byte(err.Error())
×
4289
                }
4290
                l.failf(
×
4291
                        LinkFailureError{
×
4292
                                code:          ErrInvalidCommitment,
×
4293
                                FailureAction: LinkFailureForceClose,
×
4294
                                SendData:      sendData,
×
4295
                        },
×
4296
                        "ChannelPoint(%v): unable to accept new "+
×
4297
                                "commitment: %v",
×
4298
                        l.channel.ChannelPoint(), err,
×
4299
                )
×
4300

×
4301
                return err
×
4302
        }
4303

4304
        // As we've just accepted a new state, we'll now immediately send the
4305
        // remote peer a revocation for our prior state.
4306
        nextRevocation, currentHtlcs, finalHTLCs, err :=
2✔
4307
                l.channel.RevokeCurrentCommitment()
2✔
4308
        if err != nil {
2✔
4309
                l.log.Errorf("unable to revoke commitment: %v", err)
×
4310

×
4311
                // We need to fail the channel in case revoking our local
×
4312
                // commitment does not succeed. We might have already advanced
×
4313
                // our channel state which would lead us to proceed with an
×
4314
                // unclean state.
×
4315
                //
×
4316
                // NOTE: We do not trigger a force close because this could
×
4317
                // resolve itself in case our db was just busy not accepting new
×
4318
                // transactions.
×
4319
                l.failf(
×
4320
                        LinkFailureError{
×
4321
                                code:          ErrInternalError,
×
4322
                                Warning:       true,
×
4323
                                FailureAction: LinkFailureDisconnect,
×
4324
                        },
×
4325
                        "ChannelPoint(%v): unable to accept new "+
×
4326
                                "commitment: %v",
×
4327
                        l.channel.ChannelPoint(), err,
×
4328
                )
×
4329

×
4330
                return err
×
4331
        }
×
4332

4333
        // As soon as we are ready to send our next revocation, we can invoke
4334
        // the incoming commit hooks.
4335
        l.Lock()
2✔
4336
        l.incomingCommitHooks.invoke()
2✔
4337
        l.Unlock()
2✔
4338

2✔
4339
        err = l.cfg.Peer.SendMessage(false, nextRevocation)
2✔
4340
        if err != nil {
2✔
UNCOV
4341
                l.log.Errorf("failed to send RevokeAndAck: %v", err)
×
UNCOV
4342
        }
×
4343

4344
        // Notify the incoming htlcs of which the resolutions were locked in.
4345
        for id, settled := range finalHTLCs {
4✔
4346
                l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
2✔
4347
                        models.CircuitKey{
2✔
4348
                                ChanID: l.ShortChanID(),
2✔
4349
                                HtlcID: id,
2✔
4350
                        },
2✔
4351
                        channeldb.FinalHtlcInfo{
2✔
4352
                                Settled:  settled,
2✔
4353
                                Offchain: true,
2✔
4354
                        },
2✔
4355
                )
2✔
4356
        }
2✔
4357

4358
        // Since we just revoked our commitment, we may have a new set of HTLC's
4359
        // on our commitment, so we'll send them using our function closure
4360
        // NotifyContractUpdate.
4361
        newUpdate := &contractcourt.ContractUpdate{
2✔
4362
                HtlcKey: contractcourt.LocalHtlcSet,
2✔
4363
                Htlcs:   currentHtlcs,
2✔
4364
        }
2✔
4365
        err = l.cfg.NotifyContractUpdate(newUpdate)
2✔
4366
        if err != nil {
2✔
4367
                return fmt.Errorf("unable to notify contract update: %w", err)
×
4368
        }
×
4369

4370
        select {
2✔
UNCOV
4371
        case <-l.cg.Done():
×
UNCOV
4372
                return nil
×
4373
        default:
2✔
4374
        }
4375

4376
        // If the remote party initiated the state transition, we'll reply with
4377
        // a signature to provide them with their version of the latest
4378
        // commitment. Otherwise, both commitment chains are fully synced from
4379
        // our PoV, then we don't need to reply with a signature as both sides
4380
        // already have a commitment with the latest accepted.
4381
        if l.channel.OweCommitment() {
4✔
4382
                if !l.updateCommitTxOrFail(ctx) {
2✔
UNCOV
4383
                        return nil
×
UNCOV
4384
                }
×
4385
        }
4386

4387
        // If we need to send out an Stfu, this would be the time to do so.
4388
        if l.noDanglingUpdates(lntypes.Local) {
4✔
4389
                err = l.quiescer.SendOwedStfu()
2✔
4390
                if err != nil {
2✔
4391
                        l.stfuFailf("sendOwedStfu: %v", err)
×
4392
                }
×
4393
        }
4394

4395
        // Now that we have finished processing the incoming CommitSig and sent
4396
        // out our RevokeAndAck, we invoke the flushHooks if the channel state
4397
        // is clean.
4398
        l.Lock()
2✔
4399
        if l.channel.IsChannelClean() {
4✔
4400
                l.flushHooks.invoke()
2✔
4401
        }
2✔
4402
        l.Unlock()
2✔
4403

2✔
4404
        return nil
2✔
4405
}
4406

4407
// processRemoteRevokeAndAck takes a `RevokeAndAck` msg sent from the remote and
4408
// processes it.
4409
func (l *channelLink) processRemoteRevokeAndAck(ctx context.Context,
4410
        msg *lnwire.RevokeAndAck) error {
2✔
4411

2✔
4412
        // We've received a revocation from the remote chain, if valid, this
2✔
4413
        // moves the remote chain forward, and expands our revocation window.
2✔
4414

2✔
4415
        // We now process the message and advance our remote commit chain.
2✔
4416
        fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
2✔
4417
        if err != nil {
2✔
4418
                // TODO(halseth): force close?
×
4419
                l.failf(
×
4420
                        LinkFailureError{
×
4421
                                code:          ErrInvalidRevocation,
×
4422
                                FailureAction: LinkFailureDisconnect,
×
4423
                        },
×
4424
                        "unable to accept revocation: %v", err,
×
4425
                )
×
4426

×
4427
                return err
×
4428
        }
×
4429

4430
        // The remote party now has a new primary commitment, so we'll update
4431
        // the contract court to be aware of this new set (the prior old remote
4432
        // pending).
4433
        newUpdate := &contractcourt.ContractUpdate{
2✔
4434
                HtlcKey: contractcourt.RemoteHtlcSet,
2✔
4435
                Htlcs:   remoteHTLCs,
2✔
4436
        }
2✔
4437
        err = l.cfg.NotifyContractUpdate(newUpdate)
2✔
4438
        if err != nil {
2✔
4439
                return fmt.Errorf("unable to notify contract update: %w", err)
×
4440
        }
×
4441

4442
        select {
2✔
UNCOV
4443
        case <-l.cg.Done():
×
UNCOV
4444
                return nil
×
4445
        default:
2✔
4446
        }
4447

4448
        // If we have a tower client for this channel type, we'll create a
4449
        // backup for the current state.
4450
        if l.cfg.TowerClient != nil {
4✔
4451
                state := l.channel.State()
2✔
4452
                chanID := l.ChanID()
2✔
4453

2✔
4454
                err = l.cfg.TowerClient.BackupState(
2✔
4455
                        &chanID, state.RemoteCommitment.CommitHeight-1,
2✔
4456
                )
2✔
4457
                if err != nil {
2✔
4458
                        l.failf(LinkFailureError{
×
4459
                                code: ErrInternalError,
×
4460
                        }, "unable to queue breach backup: %v", err)
×
4461

×
4462
                        return err
×
4463
                }
×
4464
        }
4465

4466
        // If we can send updates then we can process adds in case we are the
4467
        // exit hop and need to send back resolutions, or in case there are
4468
        // validity issues with the packets. Otherwise we defer the action until
4469
        // resume.
4470
        //
4471
        // We are free to process the settles and fails without this check since
4472
        // processing those can't result in further updates to this channel
4473
        // link.
4474
        if l.quiescer.CanSendUpdates() {
4✔
4475
                l.processRemoteAdds(fwdPkg)
2✔
4476
        } else {
2✔
UNCOV
4477
                l.quiescer.OnResume(func() {
×
4478
                        l.processRemoteAdds(fwdPkg)
×
4479
                })
×
4480
        }
4481
        l.processRemoteSettleFails(fwdPkg)
2✔
4482

2✔
4483
        // If the link failed during processing the adds, we must return to
2✔
4484
        // ensure we won't attempted to update the state further.
2✔
4485
        if l.failed {
2✔
4486
                return nil
×
4487
        }
×
4488

4489
        // The revocation window opened up. If there are pending local updates,
4490
        // try to update the commit tx. Pending updates could already have been
4491
        // present because of a previously failed update to the commit tx or
4492
        // freshly added in by processRemoteAdds. Also in case there are no
4493
        // local updates, but there are still remote updates that are not in the
4494
        // remote commit tx yet, send out an update.
4495
        if l.channel.OweCommitment() {
4✔
4496
                if !l.updateCommitTxOrFail(ctx) {
2✔
UNCOV
4497
                        return nil
×
UNCOV
4498
                }
×
4499
        }
4500

4501
        // Now that we have finished processing the RevokeAndAck, we can invoke
4502
        // the flushHooks if the channel state is clean.
4503
        l.Lock()
2✔
4504
        if l.channel.IsChannelClean() {
4✔
4505
                l.flushHooks.invoke()
2✔
4506
        }
2✔
4507
        l.Unlock()
2✔
4508

2✔
4509
        return nil
2✔
4510
}
4511

4512
// processRemoteUpdateFee takes an `UpdateFee` msg sent from the remote and
4513
// processes it.
UNCOV
4514
func (l *channelLink) processRemoteUpdateFee(msg *lnwire.UpdateFee) error {
×
UNCOV
4515
        // Check and see if their proposed fee-rate would make us exceed the fee
×
UNCOV
4516
        // threshold.
×
UNCOV
4517
        fee := chainfee.SatPerKWeight(msg.FeePerKw)
×
UNCOV
4518

×
UNCOV
4519
        isDust, err := l.exceedsFeeExposureLimit(fee)
×
UNCOV
4520
        if err != nil {
×
4521
                // This shouldn't typically happen. If it does, it indicates
×
4522
                // something is wrong with our channel state.
×
4523
                l.log.Errorf("Unable to determine if fee threshold " +
×
4524
                        "exceeded")
×
4525
                l.failf(LinkFailureError{code: ErrInternalError},
×
4526
                        "error calculating fee exposure: %v", err)
×
4527

×
4528
                return err
×
4529
        }
×
4530

UNCOV
4531
        if isDust {
×
4532
                // The proposed fee-rate makes us exceed the fee threshold.
×
4533
                l.failf(LinkFailureError{code: ErrInternalError},
×
4534
                        "fee threshold exceeded: %v", err)
×
4535
                return err
×
4536
        }
×
4537

4538
        // We received fee update from peer. If we are the initiator we will
4539
        // fail the channel, if not we will apply the update.
UNCOV
4540
        if err := l.channel.ReceiveUpdateFee(fee); err != nil {
×
4541
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4542
                        "error receiving fee update: %v", err)
×
4543
                return err
×
4544
        }
×
4545

4546
        // Update the mailbox's feerate as well.
UNCOV
4547
        l.mailBox.SetFeeRate(fee)
×
UNCOV
4548

×
UNCOV
4549
        return nil
×
4550
}
4551

4552
// processRemoteError takes an `Error` msg sent from the remote and fails the
4553
// channel link.
4554
func (l *channelLink) processRemoteError(msg *lnwire.Error) {
1✔
4555
        // Error received from remote, MUST fail channel, but should only print
1✔
4556
        // the contents of the error message if all characters are printable
1✔
4557
        // ASCII.
1✔
4558
        l.failf(
1✔
4559
                // TODO(halseth): we currently don't fail the channel
1✔
4560
                // permanently, as there are some sync issues with other
1✔
4561
                // implementations that will lead to them sending an
1✔
4562
                // error message, but we can recover from on next
1✔
4563
                // connection. See
1✔
4564
                // https://github.com/ElementsProject/lightning/issues/4212
1✔
4565
                LinkFailureError{
1✔
4566
                        code:             ErrRemoteError,
1✔
4567
                        PermanentFailure: false,
1✔
4568
                },
1✔
4569
                "ChannelPoint(%v): received error from peer: %v",
1✔
4570
                l.channel.ChannelPoint(), msg.Error(),
1✔
4571
        )
1✔
4572
}
1✔
4573

4574
// processLocalUpdateFulfillHTLC takes an `UpdateFulfillHTLC` from the local and
4575
// processes it.
4576
func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context,
4577
        pkt *htlcPacket, htlc *lnwire.UpdateFulfillHTLC) {
2✔
4578

2✔
4579
        // If hodl.SettleOutgoing mode is active, we exit early to simulate
2✔
4580
        // arbitrary delays between the switch adding the SETTLE to the mailbox,
2✔
4581
        // and the HTLC being added to the commitment state.
2✔
4582
        if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
2✔
4583
                l.log.Warnf(hodl.SettleOutgoing.Warning())
×
4584
                l.mailBox.AckPacket(pkt.inKey())
×
4585

×
4586
                return
×
4587
        }
×
4588

4589
        // An HTLC we forward to the switch has just settled somewhere upstream.
4590
        // Therefore we settle the HTLC within the our local state machine.
4591
        inKey := pkt.inKey()
2✔
4592
        err := l.channel.SettleHTLC(
2✔
4593
                htlc.PaymentPreimage, pkt.incomingHTLCID, pkt.sourceRef,
2✔
4594
                pkt.destRef, &inKey,
2✔
4595
        )
2✔
4596
        if err != nil {
2✔
4597
                l.log.Errorf("unable to settle incoming HTLC for "+
×
4598
                        "circuit-key=%v: %v", inKey, err)
×
4599

×
4600
                // If the HTLC index for Settle response was not known to our
×
4601
                // commitment state, it has already been cleaned up by a prior
×
4602
                // response. We'll thus try to clean up any lingering state to
×
4603
                // ensure we don't continue reforwarding.
×
4604
                if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
×
4605
                        l.cleanupSpuriousResponse(pkt)
×
4606
                }
×
4607

4608
                // Remove the packet from the link's mailbox to ensure it
4609
                // doesn't get replayed after a reconnection.
4610
                l.mailBox.AckPacket(inKey)
×
4611

×
4612
                return
×
4613
        }
4614

4615
        l.log.Debugf("queueing removal of SETTLE closed circuit: %s->%s",
2✔
4616
                pkt.inKey(), pkt.outKey())
2✔
4617

2✔
4618
        l.closedCircuits = append(l.closedCircuits, pkt.inKey())
2✔
4619

2✔
4620
        // With the HTLC settled, we'll need to populate the wire message to
2✔
4621
        // target the specific channel and HTLC to be canceled.
2✔
4622
        htlc.ChanID = l.ChanID()
2✔
4623
        htlc.ID = pkt.incomingHTLCID
2✔
4624

2✔
4625
        // Then we send the HTLC settle message to the connected peer so we can
2✔
4626
        // continue the propagation of the settle message.
2✔
4627
        err = l.cfg.Peer.SendMessage(false, htlc)
2✔
4628
        if err != nil {
2✔
4629
                l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
×
4630
        }
×
4631

4632
        // Send a settle event notification to htlcNotifier.
4633
        l.cfg.HtlcNotifier.NotifySettleEvent(
2✔
4634
                newHtlcKey(pkt), htlc.PaymentPreimage, getEventType(pkt),
2✔
4635
        )
2✔
4636

2✔
4637
        // Immediately update the commitment tx to minimize latency.
2✔
4638
        l.updateCommitTxOrFail(ctx)
2✔
4639
}
4640

4641
// processLocalUpdateFailHTLC takes an `UpdateFailHTLC` from the local and
4642
// processes it.
4643
func (l *channelLink) processLocalUpdateFailHTLC(ctx context.Context,
4644
        pkt *htlcPacket, htlc *lnwire.UpdateFailHTLC) {
2✔
4645

2✔
4646
        // If hodl.FailOutgoing mode is active, we exit early to simulate
2✔
4647
        // arbitrary delays between the switch adding a FAIL to the mailbox, and
2✔
4648
        // the HTLC being added to the commitment state.
2✔
4649
        if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
2✔
4650
                l.log.Warnf(hodl.FailOutgoing.Warning())
×
4651
                l.mailBox.AckPacket(pkt.inKey())
×
4652

×
4653
                return
×
4654
        }
×
4655

4656
        // An HTLC cancellation has been triggered somewhere upstream, we'll
4657
        // remove then HTLC from our local state machine.
4658
        inKey := pkt.inKey()
2✔
4659
        err := l.channel.FailHTLC(
2✔
4660
                pkt.incomingHTLCID, htlc.Reason, pkt.sourceRef, pkt.destRef,
2✔
4661
                &inKey,
2✔
4662
        )
2✔
4663
        if err != nil {
4✔
4664
                l.log.Errorf("unable to cancel incoming HTLC for "+
2✔
4665
                        "circuit-key=%v: %v", inKey, err)
2✔
4666

2✔
4667
                // If the HTLC index for Fail response was not known to our
2✔
4668
                // commitment state, it has already been cleaned up by a prior
2✔
4669
                // response. We'll thus try to clean up any lingering state to
2✔
4670
                // ensure we don't continue reforwarding.
2✔
4671
                if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
2✔
UNCOV
4672
                        l.cleanupSpuriousResponse(pkt)
×
UNCOV
4673
                }
×
4674

4675
                // Remove the packet from the link's mailbox to ensure it
4676
                // doesn't get replayed after a reconnection.
4677
                l.mailBox.AckPacket(inKey)
2✔
4678

2✔
4679
                return
2✔
4680
        }
4681

4682
        l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
2✔
4683
                pkt.inKey(), pkt.outKey())
2✔
4684

2✔
4685
        l.closedCircuits = append(l.closedCircuits, pkt.inKey())
2✔
4686

2✔
4687
        // With the HTLC removed, we'll need to populate the wire message to
2✔
4688
        // target the specific channel and HTLC to be canceled. The "Reason"
2✔
4689
        // field will have already been set within the switch.
2✔
4690
        htlc.ChanID = l.ChanID()
2✔
4691
        htlc.ID = pkt.incomingHTLCID
2✔
4692

2✔
4693
        // We send the HTLC message to the peer which initially created the
2✔
4694
        // HTLC. If the incoming blinding point is non-nil, we know that we are
2✔
4695
        // a relaying node in a blinded path. Otherwise, we're either an
2✔
4696
        // introduction node or not part of a blinded path at all.
2✔
4697
        err = l.sendIncomingHTLCFailureMsg(htlc.ID, pkt.obfuscator, htlc.Reason)
2✔
4698
        if err != nil {
2✔
4699
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
4700

×
4701
                return
×
4702
        }
×
4703

4704
        // If the packet does not have a link failure set, it failed further
4705
        // down the route so we notify a forwarding failure. Otherwise, we
4706
        // notify a link failure because it failed at our node.
4707
        if pkt.linkFailure != nil {
4✔
4708
                l.cfg.HtlcNotifier.NotifyLinkFailEvent(
2✔
4709
                        newHtlcKey(pkt), newHtlcInfo(pkt), getEventType(pkt),
2✔
4710
                        pkt.linkFailure, false,
2✔
4711
                )
2✔
4712
        } else {
4✔
4713
                l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
2✔
4714
                        newHtlcKey(pkt), getEventType(pkt),
2✔
4715
                )
2✔
4716
        }
2✔
4717

4718
        // Immediately update the commitment tx to minimize latency.
4719
        l.updateCommitTxOrFail(ctx)
2✔
4720
}
4721

4722
// validateHtlcAmount checks if the HTLC amount is within the policy's
4723
// minimum and maximum limits. Returns a LinkError if validation fails.
4724
func (l *channelLink) validateHtlcAmount(policy models.ForwardingPolicy,
4725
        payHash [32]byte, amt lnwire.MilliSatoshi,
4726
        originalScid lnwire.ShortChannelID,
4727
        customRecords lnwire.CustomRecords) *LinkError {
2✔
4728

2✔
4729
        // In case we are dealing with a custom HTLC, we don't need to validate
2✔
4730
        // the HTLC constraints.
2✔
4731
        //
2✔
4732
        // NOTE: Custom HTLCs are only locally sourced and will use custom
2✔
4733
        // channels which are not routable channels and should have their policy
2✔
4734
        // not restricted in the first place. However to be sure we skip this
2✔
4735
        // check otherwise we might end up in a loop of sending to the same
2✔
4736
        // route again and again because link errors are not persisted in
2✔
4737
        // mission control.
2✔
4738
        if fn.MapOptionZ(
2✔
4739
                l.cfg.AuxTrafficShaper,
2✔
4740
                func(ts AuxTrafficShaper) bool {
2✔
4741
                        return ts.IsCustomHTLC(customRecords)
×
4742
                },
×
4743
        ) {
×
4744

×
4745
                l.log.Debugf("Skipping htlc amount policy validation for " +
×
4746
                        "custom htlc")
×
4747

×
4748
                return nil
×
4749
        }
×
4750

4751
        // As our first sanity check, we'll ensure that the passed HTLC isn't
4752
        // too small for the next hop. If so, then we'll cancel the HTLC
4753
        // directly.
4754
        if amt < policy.MinHTLCOut {
4✔
4755
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
2✔
4756
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
2✔
4757
                        amt)
2✔
4758

2✔
4759
                // As part of the returned error, we'll send our latest routing
2✔
4760
                // policy so the sending node obtains the most up to date data.
2✔
4761
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
4762
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
2✔
4763
                }
2✔
4764
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
4765

2✔
4766
                return NewLinkError(failure)
2✔
4767
        }
4768

4769
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
4770
        // cancel the HTLC directly.
4771
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
4✔
4772
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
2✔
4773
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
2✔
4774

2✔
4775
                // As part of the returned error, we'll send our latest routing
2✔
4776
                // policy so the sending node obtains the most up-to-date data.
2✔
4777
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
4778
                        return lnwire.NewTemporaryChannelFailure(upd)
2✔
4779
                }
2✔
4780
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
4781

2✔
4782
                return NewDetailedLinkError(
2✔
4783
                        failure, OutgoingFailureHTLCExceedsMax,
2✔
4784
                )
2✔
4785
        }
4786

4787
        return nil
2✔
4788
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc