• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16181619122

09 Jul 2025 10:33PM UTC coverage: 55.326% (-2.3%) from 57.611%
16181619122

Pull #10060

github

web-flow
Merge d15e8671f into 0e830da9d
Pull Request #10060: sweep: fix expected spending events being missed

9 of 26 new or added lines in 2 files covered. (34.62%)

23695 existing lines in 280 files now uncovered.

108518 of 196143 relevant lines covered (55.33%)

22354.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.36
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        crand "crypto/rand"
7
        "crypto/sha256"
8
        "errors"
9
        "fmt"
10
        prand "math/rand"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/btcsuite/btclog/v2"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/contractcourt"
20
        "github.com/lightningnetwork/lnd/fn/v2"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/invoices"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lntypes"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
31
        "github.com/lightningnetwork/lnd/lnwire"
32
        "github.com/lightningnetwork/lnd/queue"
33
        "github.com/lightningnetwork/lnd/record"
34
        "github.com/lightningnetwork/lnd/routing/route"
35
        "github.com/lightningnetwork/lnd/ticker"
36
        "github.com/lightningnetwork/lnd/tlv"
37
)
38

39
const (
40
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
41
        // the node accepts for forwarded payments. The value is relative to the
42
        // current block height. The reason to have a maximum is to prevent
43
        // funds getting locked up unreasonably long. Otherwise, an attacker
44
        // willing to lock its own funds too, could force the funds of this node
45
        // to be locked up for an indefinite (max int32) number of blocks.
46
        //
47
        // The value 2016 corresponds to on average two weeks worth of blocks
48
        // and is based on the maximum number of hops (20), the default CLTV
49
        // delta (40), and some extra margin to account for the other lightning
50
        // implementations and past lnd versions which used to have a default
51
        // CLTV delta of 144.
52
        DefaultMaxOutgoingCltvExpiry = 2016
53

54
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
55
        // which a link should propose to update its commitment fee rate.
56
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
57

58
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
59
        // which a link should propose to update its commitment fee rate.
60
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
61

62
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
63
        // a channel's commitment fee to be of its balance. This only applies to
64
        // the initiator of the channel.
65
        DefaultMaxLinkFeeAllocation float64 = 0.5
66
)
67

68
// ExpectedFee computes the expected fee for a given htlc amount. The value
69
// returned from this function is to be used as a sanity check when forwarding
70
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
71
// forwarding policy.
72
//
73
// TODO(roasbeef): also add in current available channel bandwidth, inverse
74
// func
75
func ExpectedFee(f models.ForwardingPolicy,
76
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
78✔
77

78✔
78
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
78✔
79
}
78✔
80

81
// ChannelLinkConfig defines the configuration for the channel link. ALL
82
// elements within the configuration MUST be non-nil for channel link to carry
83
// out its duties.
84
type ChannelLinkConfig struct {
85
        // FwrdingPolicy is the initial forwarding policy to be used when
86
        // deciding whether to forwarding incoming HTLC's or not. This value
87
        // can be updated with subsequent calls to UpdateForwardingPolicy
88
        // targeted at a given ChannelLink concrete interface implementation.
89
        FwrdingPolicy models.ForwardingPolicy
90

91
        // Circuits provides restricted access to the switch's circuit map,
92
        // allowing the link to open and close circuits.
93
        Circuits CircuitModifier
94

95
        // BestHeight returns the best known height.
96
        BestHeight func() uint32
97

98
        // ForwardPackets attempts to forward the batch of htlcs through the
99
        // switch. The function returns and error in case it fails to send one or
100
        // more packets. The link's quit signal should be provided to allow
101
        // cancellation of forwarding during link shutdown.
102
        ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
103

104
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
105
        // blobs, which are then used to inform how to forward an HTLC.
106
        //
107
        // NOTE: This function assumes the same set of readers and preimages
108
        // are always presented for the same identifier. The last boolean is
109
        // used to decide whether this is a reforwarding or not - when it's
110
        // reforwarding, we skip the replay check enforced in our decay log.
111
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) (
112
                []hop.DecodeHopIteratorResponse, error)
113

114
        // ExtractErrorEncrypter function is responsible for decoding HTLC
115
        // Sphinx onion blob, and creating onion failure obfuscator.
116
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
117

118
        // FetchLastChannelUpdate retrieves the latest routing policy for a
119
        // target channel. This channel will typically be the outgoing channel
120
        // specified when we receive an incoming HTLC.  This will be used to
121
        // provide payment senders our latest policy when sending encrypted
122
        // error messages.
123
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
124
                *lnwire.ChannelUpdate1, error)
125

126
        // Peer is a lightning network node with which we have the channel link
127
        // opened.
128
        Peer lnpeer.Peer
129

130
        // Registry is a sub-system which responsible for managing the invoices
131
        // in thread-safe manner.
132
        Registry InvoiceDatabase
133

134
        // PreimageCache is a global witness beacon that houses any new
135
        // preimages discovered by other links. We'll use this to add new
136
        // witnesses that we discover which will notify any sub-systems
137
        // subscribed to new events.
138
        PreimageCache contractcourt.WitnessBeacon
139

140
        // OnChannelFailure is a function closure that we'll call if the
141
        // channel failed for some reason. Depending on the severity of the
142
        // error, the closure potentially must force close this channel and
143
        // disconnect the peer.
144
        //
145
        // NOTE: The method must return in order for the ChannelLink to be able
146
        // to shut down properly.
147
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
148
                LinkFailureError)
149

150
        // UpdateContractSignals is a function closure that we'll use to update
151
        // outside sub-systems with this channel's latest ShortChannelID.
152
        UpdateContractSignals func(*contractcourt.ContractSignals) error
153

154
        // NotifyContractUpdate is a function closure that we'll use to update
155
        // the contractcourt and more specifically the ChannelArbitrator of the
156
        // latest channel state.
157
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
158

159
        // ChainEvents is an active subscription to the chain watcher for this
160
        // channel to be notified of any on-chain activity related to this
161
        // channel.
162
        ChainEvents *contractcourt.ChainEventSubscription
163

164
        // FeeEstimator is an instance of a live fee estimator which will be
165
        // used to dynamically regulate the current fee of the commitment
166
        // transaction to ensure timely confirmation.
167
        FeeEstimator chainfee.Estimator
168

169
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
170
        // for HTLC forwarding internal to the switch.
171
        //
172
        // NOTE: This should only be used for testing.
173
        HodlMask hodl.Mask
174

175
        // SyncStates is used to indicate that we need send the channel
176
        // reestablishment message to the remote peer. It should be done if our
177
        // clients have been restarted, or remote peer have been reconnected.
178
        SyncStates bool
179

180
        // BatchTicker is the ticker that determines the interval that we'll
181
        // use to check the batch to see if there're any updates we should
182
        // flush out. By batching updates into a single commit, we attempt to
183
        // increase throughput by maximizing the number of updates coalesced
184
        // into a single commit.
185
        BatchTicker ticker.Ticker
186

187
        // FwdPkgGCTicker is the ticker determining the frequency at which
188
        // garbage collection of forwarding packages occurs. We use a
189
        // time-based approach, as opposed to block epochs, as to not hinder
190
        // syncing.
191
        FwdPkgGCTicker ticker.Ticker
192

193
        // PendingCommitTicker is a ticker that allows the link to determine if
194
        // a locally initiated commitment dance gets stuck waiting for the
195
        // remote party to revoke.
196
        PendingCommitTicker ticker.Ticker
197

198
        // BatchSize is the max size of a batch of updates done to the link
199
        // before we do a state update.
200
        BatchSize uint32
201

202
        // UnsafeReplay will cause a link to replay the adds in its latest
203
        // commitment txn after the link is restarted. This should only be used
204
        // in testing, it is here to ensure the sphinx replay detection on the
205
        // receiving node is persistent.
206
        UnsafeReplay bool
207

208
        // MinUpdateTimeout represents the minimum interval in which a link
209
        // will propose to update its commitment fee rate. A random timeout will
210
        // be selected between this and MaxUpdateTimeout.
211
        MinUpdateTimeout time.Duration
212

213
        // MaxUpdateTimeout represents the maximum interval in which a link
214
        // will propose to update its commitment fee rate. A random timeout will
215
        // be selected between this and MinUpdateTimeout.
216
        MaxUpdateTimeout time.Duration
217

218
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
219
        // an htlc where we don't offer an htlc anymore. This should be at least
220
        // the outgoing broadcast delta, because in any case we don't want to
221
        // risk offering an htlc that triggers channel closure.
222
        OutgoingCltvRejectDelta uint32
223

224
        // TowerClient is an optional engine that manages the signing,
225
        // encrypting, and uploading of justice transactions to the daemon's
226
        // configured set of watchtowers for legacy channels.
227
        TowerClient TowerClient
228

229
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
230
        // should accept for a forwarded HTLC. The value is relative to the
231
        // current block height.
232
        MaxOutgoingCltvExpiry uint32
233

234
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
235
        // commitment fee to be of its balance. This only applies to the
236
        // initiator of the channel.
237
        MaxFeeAllocation float64
238

239
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
240
        // the initiator for channels of the anchor type.
241
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
242

243
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
244
        // link is first started.
245
        NotifyActiveLink func(wire.OutPoint)
246

247
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
248
        // channels becomes active.
249
        NotifyActiveChannel func(wire.OutPoint)
250

251
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
252
        // when channels become inactive.
253
        NotifyInactiveChannel func(wire.OutPoint)
254

255
        // NotifyInactiveLinkEvent allows the switch to tell the
256
        // ChannelNotifier when a channel link become inactive.
257
        NotifyInactiveLinkEvent func(wire.OutPoint)
258

259
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
260
        // events through.
261
        HtlcNotifier htlcNotifier
262

263
        // FailAliasUpdate is a function used to fail an HTLC for an
264
        // option_scid_alias channel.
265
        FailAliasUpdate func(sid lnwire.ShortChannelID,
266
                incoming bool) *lnwire.ChannelUpdate1
267

268
        // GetAliases is used by the link and switch to fetch the set of
269
        // aliases for a given link.
270
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
271

272
        // PreviouslySentShutdown is an optional value that is set if, at the
273
        // time of the link being started, persisted shutdown info was found for
274
        // the channel. This value being set means that we previously sent a
275
        // Shutdown message to our peer, and so we should do so again on
276
        // re-establish and should not allow anymore HTLC adds on the outgoing
277
        // direction of the link.
278
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
279

280
        // Adds the option to disable forwarding payments in blinded routes
281
        // by failing back any blinding-related payloads as if they were
282
        // invalid.
283
        DisallowRouteBlinding bool
284

285
        // DisallowQuiescence is a flag that can be used to disable the
286
        // quiescence protocol.
287
        DisallowQuiescence bool
288

289
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
290
        // restrict the flow of HTLCs and fee updates.
291
        MaxFeeExposure lnwire.MilliSatoshi
292

293
        // ShouldFwdExpEndorsement is a closure that indicates whether the link
294
        // should forward experimental endorsement signals.
295
        ShouldFwdExpEndorsement func() bool
296

297
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
298
        // used to manage the bandwidth of the link.
299
        AuxTrafficShaper fn.Option[AuxTrafficShaper]
300
}
301

302
// channelLink is the service which drives a channel's commitment update
303
// state-machine. In the event that an HTLC needs to be propagated to another
304
// link, the forward handler from config is used which sends HTLC to the
305
// switch. Additionally, the link encapsulate logic of commitment protocol
306
// message ordering and updates.
307
type channelLink struct {
308
        // The following fields are only meant to be used *atomically*
309
        started       int32
310
        reestablished int32
311
        shutdown      int32
312

313
        // failed should be set to true in case a link error happens, making
314
        // sure we don't process any more updates.
315
        failed bool
316

317
        // keystoneBatch represents a volatile list of keystones that must be
318
        // written before attempting to sign the next commitment txn. These
319
        // represent all the HTLC's forwarded to the link from the switch. Once
320
        // we lock them into our outgoing commitment, then the circuit has a
321
        // keystone, and is fully opened.
322
        keystoneBatch []Keystone
323

324
        // openedCircuits is the set of all payment circuits that will be open
325
        // once we make our next commitment. After making the commitment we'll
326
        // ACK all these from our mailbox to ensure that they don't get
327
        // re-delivered if we reconnect.
328
        openedCircuits []CircuitKey
329

330
        // closedCircuits is the set of all payment circuits that will be
331
        // closed once we make our next commitment. After taking the commitment
332
        // we'll ACK all these to ensure that they don't get re-delivered if we
333
        // reconnect.
334
        closedCircuits []CircuitKey
335

336
        // channel is a lightning network channel to which we apply htlc
337
        // updates.
338
        channel *lnwallet.LightningChannel
339

340
        // cfg is a structure which carries all dependable fields/handlers
341
        // which may affect behaviour of the service.
342
        cfg ChannelLinkConfig
343

344
        // mailBox is the main interface between the outside world and the
345
        // link. All incoming messages will be sent over this mailBox. Messages
346
        // include new updates from our connected peer, and new packets to be
347
        // forwarded sent by the switch.
348
        mailBox MailBox
349

350
        // upstream is a channel that new messages sent from the remote peer to
351
        // the local peer will be sent across.
352
        upstream chan lnwire.Message
353

354
        // downstream is a channel in which new multi-hop HTLC's to be
355
        // forwarded will be sent across. Messages from this channel are sent
356
        // by the HTLC switch.
357
        downstream chan *htlcPacket
358

359
        // updateFeeTimer is the timer responsible for updating the link's
360
        // commitment fee every time it fires.
361
        updateFeeTimer *time.Timer
362

363
        // uncommittedPreimages stores a list of all preimages that have been
364
        // learned since receiving the last CommitSig from the remote peer. The
365
        // batch will be flushed just before accepting the subsequent CommitSig
366
        // or on shutdown to avoid doing a write for each preimage received.
367
        uncommittedPreimages []lntypes.Preimage
368

369
        sync.RWMutex
370

371
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
372
        // registry.
373
        hodlQueue *queue.ConcurrentQueue
374

375
        // hodlMap stores related htlc data for a circuit key. It allows
376
        // resolving those htlcs when we receive a message on hodlQueue.
377
        hodlMap map[models.CircuitKey]hodlHtlc
378

379
        // log is a link-specific logging instance.
380
        log btclog.Logger
381

382
        // isOutgoingAddBlocked tracks whether the channelLink can send an
383
        // UpdateAddHTLC.
384
        isOutgoingAddBlocked atomic.Bool
385

386
        // isIncomingAddBlocked tracks whether the channelLink can receive an
387
        // UpdateAddHTLC.
388
        isIncomingAddBlocked atomic.Bool
389

390
        // flushHooks is a hookMap that is triggered when we reach a channel
391
        // state with no live HTLCs.
392
        flushHooks hookMap
393

394
        // outgoingCommitHooks is a hookMap that is triggered after we send our
395
        // next CommitSig.
396
        outgoingCommitHooks hookMap
397

398
        // incomingCommitHooks is a hookMap that is triggered after we receive
399
        // our next CommitSig.
400
        incomingCommitHooks hookMap
401

402
        // quiescer is the state machine that tracks where this channel is with
403
        // respect to the quiescence protocol.
404
        quiescer Quiescer
405

406
        // quiescenceReqs is a queue of requests to quiesce this link. The
407
        // members of the queue are send-only channels we should call back with
408
        // the result.
409
        quiescenceReqs chan StfuReq
410

411
        // cg is a helper that encapsulates a wait group and quit channel and
412
        // allows contexts that either block or cancel on those depending on
413
        // the use case.
414
        cg *fn.ContextGuard
415
}
416

417
// hookMap is a data structure that is used to track the hooks that need to be
418
// called in various parts of the channelLink's lifecycle.
419
//
420
// WARNING: NOT thread-safe.
421
type hookMap struct {
422
        // allocIdx keeps track of the next id we haven't yet allocated.
423
        allocIdx atomic.Uint64
424

425
        // transient is a map of hooks that are only called the next time invoke
426
        // is called. These hooks are deleted during invoke.
427
        transient map[uint64]func()
428

429
        // newTransients is a channel that we use to accept new hooks into the
430
        // hookMap.
431
        newTransients chan func()
432
}
433

434
// newHookMap initializes a new empty hookMap.
435
func newHookMap() hookMap {
645✔
436
        return hookMap{
645✔
437
                allocIdx:      atomic.Uint64{},
645✔
438
                transient:     make(map[uint64]func()),
645✔
439
                newTransients: make(chan func()),
645✔
440
        }
645✔
441
}
645✔
442

443
// alloc allocates space in the hook map for the supplied hook, the second
444
// argument determines whether it goes into the transient or persistent part
445
// of the hookMap.
446
func (m *hookMap) alloc(hook func()) uint64 {
2✔
447
        // We assume we never overflow a uint64. Seems OK.
2✔
448
        hookID := m.allocIdx.Add(1)
2✔
449
        if hookID == 0 {
2✔
450
                panic("hookMap allocIdx overflow")
×
451
        }
452
        m.transient[hookID] = hook
2✔
453

2✔
454
        return hookID
2✔
455
}
456

457
// invoke is used on a hook map to call all the registered hooks and then clear
458
// out the transient hooks so they are not called again.
459
func (m *hookMap) invoke() {
2,722✔
460
        for _, hook := range m.transient {
2,724✔
461
                hook()
2✔
462
        }
2✔
463

464
        m.transient = make(map[uint64]func())
2,722✔
465
}
466

467
// hodlHtlc contains htlc data that is required for resolution.
468
type hodlHtlc struct {
469
        add        lnwire.UpdateAddHTLC
470
        sourceRef  channeldb.AddRef
471
        obfuscator hop.ErrorEncrypter
472
}
473

474
// NewChannelLink creates a new instance of a ChannelLink given a configuration
475
// and active channel that will be used to verify/apply updates to.
476
func NewChannelLink(cfg ChannelLinkConfig,
477
        channel *lnwallet.LightningChannel) ChannelLink {
215✔
478

215✔
479
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
215✔
480

215✔
481
        // If the max fee exposure isn't set, use the default.
215✔
482
        if cfg.MaxFeeExposure == 0 {
430✔
483
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
215✔
484
        }
215✔
485

486
        var qsm Quiescer
215✔
487
        if !cfg.DisallowQuiescence {
430✔
488
                qsm = NewQuiescer(QuiescerCfg{
215✔
489
                        chanID: lnwire.NewChanIDFromOutPoint(
215✔
490
                                channel.ChannelPoint(),
215✔
491
                        ),
215✔
492
                        channelInitiator: channel.Initiator(),
215✔
493
                        sendMsg: func(s lnwire.Stfu) error {
217✔
494
                                return cfg.Peer.SendMessage(false, &s)
2✔
495
                        },
2✔
496
                        timeoutDuration: defaultQuiescenceTimeout,
497
                        onTimeout: func() {
2✔
498
                                cfg.Peer.Disconnect(ErrQuiescenceTimeout)
2✔
499
                        },
2✔
500
                })
501
        } else {
×
502
                qsm = &quiescerNoop{}
×
503
        }
×
504

505
        quiescenceReqs := make(
215✔
506
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
215✔
507
        )
215✔
508

215✔
509
        return &channelLink{
215✔
510
                cfg:                 cfg,
215✔
511
                channel:             channel,
215✔
512
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
215✔
513
                hodlQueue:           queue.NewConcurrentQueue(10),
215✔
514
                log:                 log.WithPrefix(logPrefix),
215✔
515
                flushHooks:          newHookMap(),
215✔
516
                outgoingCommitHooks: newHookMap(),
215✔
517
                incomingCommitHooks: newHookMap(),
215✔
518
                quiescer:            qsm,
215✔
519
                quiescenceReqs:      quiescenceReqs,
215✔
520
                cg:                  fn.NewContextGuard(),
215✔
521
        }
215✔
522
}
523

524
// A compile time check to ensure channelLink implements the ChannelLink
525
// interface.
526
var _ ChannelLink = (*channelLink)(nil)
527

528
// Start starts all helper goroutines required for the operation of the channel
529
// link.
530
//
531
// NOTE: Part of the ChannelLink interface.
532
func (l *channelLink) Start() error {
213✔
533
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
213✔
534
                err := fmt.Errorf("channel link(%v): already started", l)
×
535
                l.log.Warn("already started")
×
536
                return err
×
537
        }
×
538

539
        l.log.Info("starting")
213✔
540

213✔
541
        // If the config supplied watchtower client, ensure the channel is
213✔
542
        // registered before trying to use it during operation.
213✔
543
        if l.cfg.TowerClient != nil {
213✔
UNCOV
544
                err := l.cfg.TowerClient.RegisterChannel(
×
UNCOV
545
                        l.ChanID(), l.channel.State().ChanType,
×
UNCOV
546
                )
×
UNCOV
547
                if err != nil {
×
548
                        return err
×
549
                }
×
550
        }
551

552
        l.mailBox.ResetMessages()
213✔
553
        l.hodlQueue.Start()
213✔
554

213✔
555
        // Before launching the htlcManager messages, revert any circuits that
213✔
556
        // were marked open in the switch's circuit map, but did not make it
213✔
557
        // into a commitment txn. We use the next local htlc index as the cut
213✔
558
        // off point, since all indexes below that are committed. This action
213✔
559
        // is only performed if the link's final short channel ID has been
213✔
560
        // assigned, otherwise we would try to trim the htlcs belonging to the
213✔
561
        // all-zero, hop.Source ID.
213✔
562
        if l.ShortChanID() != hop.Source {
426✔
563
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
213✔
564
                if err != nil {
213✔
565
                        return fmt.Errorf("unable to retrieve next local "+
×
566
                                "htlc index: %v", err)
×
567
                }
×
568

569
                // NOTE: This is automatically done by the switch when it
570
                // starts up, but is necessary to prevent inconsistencies in
571
                // the case that the link flaps. This is a result of a link's
572
                // life-cycle being shorter than that of the switch.
573
                chanID := l.ShortChanID()
213✔
574
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
213✔
575
                if err != nil {
213✔
576
                        return fmt.Errorf("unable to trim circuits above "+
×
577
                                "local htlc index %d: %v", localHtlcIndex, err)
×
578
                }
×
579

580
                // Since the link is live, before we start the link we'll update
581
                // the ChainArbitrator with the set of new channel signals for
582
                // this channel.
583
                //
584
                // TODO(roasbeef): split goroutines within channel arb to avoid
585
                go func() {
426✔
586
                        signals := &contractcourt.ContractSignals{
213✔
587
                                ShortChanID: l.channel.ShortChanID(),
213✔
588
                        }
213✔
589

213✔
590
                        err := l.cfg.UpdateContractSignals(signals)
213✔
591
                        if err != nil {
213✔
592
                                l.log.Errorf("unable to update signals")
×
593
                        }
×
594
                }()
595
        }
596

597
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
213✔
598

213✔
599
        l.cg.WgAdd(1)
213✔
600
        go l.htlcManager(context.TODO())
213✔
601

213✔
602
        return nil
213✔
603
}
604

605
// Stop gracefully stops all active helper goroutines, then waits until they've
606
// exited.
607
//
608
// NOTE: Part of the ChannelLink interface.
609
func (l *channelLink) Stop() {
214✔
610
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
226✔
611
                l.log.Warn("already stopped")
12✔
612
                return
12✔
613
        }
12✔
614

615
        l.log.Info("stopping")
202✔
616

202✔
617
        // As the link is stopping, we are no longer interested in htlc
202✔
618
        // resolutions coming from the invoice registry.
202✔
619
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
202✔
620

202✔
621
        if l.cfg.ChainEvents.Cancel != nil {
202✔
UNCOV
622
                l.cfg.ChainEvents.Cancel()
×
UNCOV
623
        }
×
624

625
        // Ensure the channel for the timer is drained.
626
        if l.updateFeeTimer != nil {
404✔
627
                if !l.updateFeeTimer.Stop() {
202✔
628
                        select {
×
629
                        case <-l.updateFeeTimer.C:
×
630
                        default:
×
631
                        }
632
                }
633
        }
634

635
        if l.hodlQueue != nil {
404✔
636
                l.hodlQueue.Stop()
202✔
637
        }
202✔
638

639
        l.cg.Quit()
202✔
640
        l.cg.WgWait()
202✔
641

202✔
642
        // Now that the htlcManager has completely exited, reset the packet
202✔
643
        // courier. This allows the mailbox to revaluate any lingering Adds that
202✔
644
        // were delivered but didn't make it on a commitment to be failed back
202✔
645
        // if the link is offline for an extended period of time. The error is
202✔
646
        // ignored since it can only fail when the daemon is exiting.
202✔
647
        _ = l.mailBox.ResetPackets()
202✔
648

202✔
649
        // As a final precaution, we will attempt to flush any uncommitted
202✔
650
        // preimages to the preimage cache. The preimages should be re-delivered
202✔
651
        // after channel reestablishment, however this adds an extra layer of
202✔
652
        // protection in case the peer never returns. Without this, we will be
202✔
653
        // unable to settle any contracts depending on the preimages even though
202✔
654
        // we had learned them at some point.
202✔
655
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
202✔
656
        if err != nil {
202✔
657
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
658
                        l.uncommittedPreimages, err)
×
659
        }
×
660
}
661

662
// WaitForShutdown blocks until the link finishes shutting down, which includes
663
// termination of all dependent goroutines.
664
func (l *channelLink) WaitForShutdown() {
×
665
        l.cg.WgWait()
×
666
}
×
667

668
// EligibleToForward returns a bool indicating if the channel is able to
669
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
670
// we are eligible to update AND the channel isn't currently flushing the
671
// outgoing half of the channel.
672
//
673
// NOTE: MUST NOT be called from the main event loop.
674
func (l *channelLink) EligibleToForward() bool {
613✔
675
        l.RLock()
613✔
676
        defer l.RUnlock()
613✔
677

613✔
678
        return l.eligibleToForward()
613✔
679
}
613✔
680

681
// eligibleToForward returns a bool indicating if the channel is able to
682
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
683
// we are eligible to update AND the channel isn't currently flushing the
684
// outgoing half of the channel.
685
//
686
// NOTE: MUST be called from the main event loop.
687
func (l *channelLink) eligibleToForward() bool {
613✔
688
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
613✔
689
}
613✔
690

691
// eligibleToUpdate returns a bool indicating if the channel is able to update
692
// channel state. We're able to update channel state if we know the remote
693
// party's next revocation point. Otherwise, we can't initiate new channel
694
// state. We also require that the short channel ID not be the all-zero source
695
// ID, meaning that the channel has had its ID finalized.
696
//
697
// NOTE: MUST be called from the main event loop.
698
func (l *channelLink) eligibleToUpdate() bool {
616✔
699
        return l.channel.RemoteNextRevocation() != nil &&
616✔
700
                l.channel.ShortChanID() != hop.Source &&
616✔
701
                l.isReestablished() &&
616✔
702
                l.quiescer.CanSendUpdates()
616✔
703
}
616✔
704

705
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
706
// the specified direction. It returns true if the state was changed and false
707
// if the desired state was already set before the method was called.
708
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
11✔
709
        if linkDirection == Outgoing {
16✔
710
                return l.isOutgoingAddBlocked.Swap(false)
5✔
711
        }
5✔
712

713
        return l.isIncomingAddBlocked.Swap(false)
6✔
714
}
715

716
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
717
// the specified direction. It returns true if the state was changed and false
718
// if the desired state was already set before the method was called.
719
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
18✔
720
        if linkDirection == Outgoing {
28✔
721
                return !l.isOutgoingAddBlocked.Swap(true)
10✔
722
        }
10✔
723

724
        return !l.isIncomingAddBlocked.Swap(true)
8✔
725
}
726

727
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
728
// the argument.
729
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
1,591✔
730
        if linkDirection == Outgoing {
2,708✔
731
                return l.isOutgoingAddBlocked.Load()
1,117✔
732
        }
1,117✔
733

734
        return l.isIncomingAddBlocked.Load()
474✔
735
}
736

737
// OnFlushedOnce adds a hook that will be called the next time the channel
738
// state reaches zero htlcs. This hook will only ever be called once. If the
739
// channel state already has zero htlcs, then this will be called immediately.
740
func (l *channelLink) OnFlushedOnce(hook func()) {
1✔
741
        select {
1✔
742
        case l.flushHooks.newTransients <- hook:
1✔
743
        case <-l.cg.Done():
×
744
        }
745
}
746

747
// OnCommitOnce adds a hook that will be called the next time a CommitSig
748
// message is sent in the argument's LinkDirection. This hook will only ever be
749
// called once. If no CommitSig is owed in the argument's LinkDirection, then
750
// we will call this hook be run immediately.
751
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
1✔
752
        var queue chan func()
1✔
753

1✔
754
        if direction == Outgoing {
2✔
755
                queue = l.outgoingCommitHooks.newTransients
1✔
756
        } else {
1✔
757
                queue = l.incomingCommitHooks.newTransients
×
758
        }
×
759

760
        select {
1✔
761
        case queue <- hook:
1✔
762
        case <-l.cg.Done():
×
763
        }
764
}
765

766
// InitStfu allows us to initiate quiescence on this link. It returns a receive
767
// only channel that will block until quiescence has been achieved, or
768
// definitively fails.
769
//
770
// This operation has been added to allow channels to be quiesced via RPC. It
771
// may be removed or reworked in the future as RPC initiated quiescence is a
772
// holdover until we have downstream protocols that use it.
773
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
1✔
774
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
1✔
775
                fn.Unit{},
1✔
776
        )
1✔
777

1✔
778
        select {
1✔
779
        case l.quiescenceReqs <- req:
1✔
780
        case <-l.cg.Done():
×
781
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
782
        }
783

784
        return out
1✔
785
}
786

787
// isReestablished returns true if the link has successfully completed the
788
// channel reestablishment dance.
789
func (l *channelLink) isReestablished() bool {
616✔
790
        return atomic.LoadInt32(&l.reestablished) == 1
616✔
791
}
616✔
792

793
// markReestablished signals that the remote peer has successfully exchanged
794
// channel reestablish messages and that the channel is ready to process
795
// subsequent messages.
796
func (l *channelLink) markReestablished() {
213✔
797
        atomic.StoreInt32(&l.reestablished, 1)
213✔
798
}
213✔
799

800
// IsUnadvertised returns true if the underlying channel is unadvertised.
801
func (l *channelLink) IsUnadvertised() bool {
2✔
802
        state := l.channel.State()
2✔
803
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
2✔
804
}
2✔
805

806
// sampleNetworkFee samples the current fee rate on the network to get into the
807
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
808
// this is the native rate used when computing the fee for commitment
809
// transactions, and the second-level HTLC transactions.
810
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
4✔
811
        // We'll first query for the sat/kw recommended to be confirmed within 3
4✔
812
        // blocks.
4✔
813
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
4✔
814
        if err != nil {
4✔
815
                return 0, err
×
816
        }
×
817

818
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
4✔
819
                int64(feePerKw))
4✔
820

4✔
821
        return feePerKw, nil
4✔
822
}
823

824
// shouldAdjustCommitFee returns true if we should update our commitment fee to
825
// match that of the network fee. We'll only update our commitment fee if the
826
// network fee is +/- 10% to our commitment fee or if our current commitment
827
// fee is below the minimum relay fee.
828
func shouldAdjustCommitFee(netFee, chanFee,
829
        minRelayFee chainfee.SatPerKWeight) bool {
14✔
830

14✔
831
        switch {
14✔
832
        // If the network fee is greater than our current commitment fee and
833
        // our current commitment fee is below the minimum relay fee then
834
        // we should switch to it no matter if it is less than a 10% increase.
835
        case netFee > chanFee && chanFee < minRelayFee:
1✔
836
                return true
1✔
837

838
        // If the network fee is greater than the commitment fee, then we'll
839
        // switch to it if it's at least 10% greater than the commit fee.
840
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
4✔
841
                return true
4✔
842

843
        // If the network fee is less than our commitment fee, then we'll
844
        // switch to it if it's at least 10% less than the commitment fee.
845
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
2✔
846
                return true
2✔
847

848
        // Otherwise, we won't modify our fee.
849
        default:
7✔
850
                return false
7✔
851
        }
852
}
853

854
// failCb is used to cut down on the argument verbosity.
855
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
856

857
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
858
// outgoing HTLC. It may return a FailureMessage that references a channel's
859
// alias. If the channel does not have an alias, then the regular channel
860
// update from disk will be returned.
861
func (l *channelLink) createFailureWithUpdate(incoming bool,
862
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
22✔
863

22✔
864
        // Determine which SCID to use in case we need to use aliases in the
22✔
865
        // ChannelUpdate.
22✔
866
        scid := outgoingScid
22✔
867
        if incoming {
22✔
868
                scid = l.ShortChanID()
×
869
        }
×
870

871
        // Try using the FailAliasUpdate function. If it returns nil, fallback
872
        // to the non-alias behavior.
873
        update := l.cfg.FailAliasUpdate(scid, incoming)
22✔
874
        if update == nil {
38✔
875
                // Fallback to the non-alias behavior.
16✔
876
                var err error
16✔
877
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
16✔
878
                if err != nil {
16✔
879
                        return &lnwire.FailTemporaryNodeFailure{}
×
880
                }
×
881
        }
882

883
        return cb(update)
22✔
884
}
885

886
// syncChanState attempts to synchronize channel states with the remote party.
887
// This method is to be called upon reconnection after the initial funding
888
// flow. We'll compare out commitment chains with the remote party, and re-send
889
// either a danging commit signature, a revocation, or both.
890
func (l *channelLink) syncChanStates(ctx context.Context) error {
170✔
891
        chanState := l.channel.State()
170✔
892

170✔
893
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
170✔
894

170✔
895
        // First, we'll generate our ChanSync message to send to the other
170✔
896
        // side. Based on this message, the remote party will decide if they
170✔
897
        // need to retransmit any data or not.
170✔
898
        localChanSyncMsg, err := chanState.ChanSyncMsg()
170✔
899
        if err != nil {
170✔
900
                return fmt.Errorf("unable to generate chan sync message for "+
×
901
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
902
        }
×
903
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
170✔
904
                return fmt.Errorf("unable to send chan sync message for "+
×
905
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
906
        }
×
907

908
        var msgsToReSend []lnwire.Message
170✔
909

170✔
910
        // Next, we'll wait indefinitely to receive the ChanSync message. The
170✔
911
        // first message sent MUST be the ChanSync message.
170✔
912
        select {
170✔
913
        case msg := <-l.upstream:
170✔
914
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
170✔
915
                        l.cfg.Peer.PubKey())
170✔
916

170✔
917
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
170✔
918
                if !ok {
170✔
919
                        return fmt.Errorf("first message sent to sync "+
×
920
                                "should be ChannelReestablish, instead "+
×
921
                                "received: %T", msg)
×
922
                }
×
923

924
                // If the remote party indicates that they think we haven't
925
                // done any state updates yet, then we'll retransmit the
926
                // channel_ready message first. We do this, as at this point
927
                // we can't be sure if they've really received the
928
                // ChannelReady message.
929
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
170✔
930
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
170✔
931
                        !l.channel.IsPending() {
334✔
932

164✔
933
                        l.log.Infof("resending ChannelReady message to peer")
164✔
934

164✔
935
                        nextRevocation, err := l.channel.NextRevocationKey()
164✔
936
                        if err != nil {
164✔
937
                                return fmt.Errorf("unable to create next "+
×
938
                                        "revocation: %v", err)
×
939
                        }
×
940

941
                        channelReadyMsg := lnwire.NewChannelReady(
164✔
942
                                l.ChanID(), nextRevocation,
164✔
943
                        )
164✔
944

164✔
945
                        // If this is a taproot channel, then we'll send the
164✔
946
                        // very same nonce that we sent above, as they should
164✔
947
                        // take the latest verification nonce we send.
164✔
948
                        if chanState.ChanType.IsTaproot() {
164✔
UNCOV
949
                                //nolint:ll
×
UNCOV
950
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
×
UNCOV
951
                        }
×
952

953
                        // For channels that negotiated the option-scid-alias
954
                        // feature bit, ensure that we send over the alias in
955
                        // the channel_ready message. We'll send the first
956
                        // alias we find for the channel since it does not
957
                        // matter which alias we send. We'll error out if no
958
                        // aliases are found.
959
                        if l.negotiatedAliasFeature() {
164✔
UNCOV
960
                                aliases := l.getAliases()
×
UNCOV
961
                                if len(aliases) == 0 {
×
962
                                        // This shouldn't happen since we
×
963
                                        // always add at least one alias before
×
964
                                        // the channel reaches the link.
×
965
                                        return fmt.Errorf("no aliases found")
×
966
                                }
×
967

968
                                // getAliases returns a copy of the alias slice
969
                                // so it is ok to use a pointer to the first
970
                                // entry.
UNCOV
971
                                channelReadyMsg.AliasScid = &aliases[0]
×
972
                        }
973

974
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
164✔
975
                        if err != nil {
164✔
976
                                return fmt.Errorf("unable to re-send "+
×
977
                                        "ChannelReady: %v", err)
×
978
                        }
×
979
                }
980

981
                // In any case, we'll then process their ChanSync message.
982
                l.log.Info("received re-establishment message from remote side")
170✔
983

170✔
984
                var (
170✔
985
                        openedCircuits []CircuitKey
170✔
986
                        closedCircuits []CircuitKey
170✔
987
                )
170✔
988

170✔
989
                // We've just received a ChanSync message from the remote
170✔
990
                // party, so we'll process the message  in order to determine
170✔
991
                // if we need to re-transmit any messages to the remote party.
170✔
992
                ctx, cancel := l.cg.Create(ctx)
170✔
993
                defer cancel()
170✔
994
                msgsToReSend, openedCircuits, closedCircuits, err =
170✔
995
                        l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
170✔
996
                if err != nil {
170✔
UNCOV
997
                        return err
×
UNCOV
998
                }
×
999

1000
                // Repopulate any identifiers for circuits that may have been
1001
                // opened or unclosed. This may happen if we needed to
1002
                // retransmit a commitment signature message.
1003
                l.openedCircuits = openedCircuits
170✔
1004
                l.closedCircuits = closedCircuits
170✔
1005

170✔
1006
                // Ensure that all packets have been have been removed from the
170✔
1007
                // link's mailbox.
170✔
1008
                if err := l.ackDownStreamPackets(); err != nil {
170✔
1009
                        return err
×
1010
                }
×
1011

1012
                if len(msgsToReSend) > 0 {
175✔
1013
                        l.log.Infof("sending %v updates to synchronize the "+
5✔
1014
                                "state", len(msgsToReSend))
5✔
1015
                }
5✔
1016

1017
                // If we have any messages to retransmit, we'll do so
1018
                // immediately so we return to a synchronized state as soon as
1019
                // possible.
1020
                for _, msg := range msgsToReSend {
181✔
1021
                        err := l.cfg.Peer.SendMessage(false, msg)
11✔
1022
                        if err != nil {
11✔
1023
                                l.log.Errorf("failed to send %v: %v",
×
1024
                                        msg.MsgType(), err)
×
1025
                        }
×
1026
                }
1027

UNCOV
1028
        case <-l.cg.Done():
×
UNCOV
1029
                return ErrLinkShuttingDown
×
1030
        }
1031

1032
        return nil
170✔
1033
}
1034

1035
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1036
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1037
// we previously received are reinstated in memory, and forwarded to the switch
1038
// if necessary. After a restart, this will also delete any previously
1039
// completed packages.
1040
func (l *channelLink) resolveFwdPkgs(ctx context.Context) error {
213✔
1041
        fwdPkgs, err := l.channel.LoadFwdPkgs()
213✔
1042
        if err != nil {
213✔
1043
                return err
×
1044
        }
×
1045

1046
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
213✔
1047

213✔
1048
        for _, fwdPkg := range fwdPkgs {
219✔
1049
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
6✔
1050
                        return err
×
1051
                }
×
1052
        }
1053

1054
        // If any of our reprocessing steps require an update to the commitment
1055
        // txn, we initiate a state transition to capture all relevant changes.
1056
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
213✔
UNCOV
1057
                return l.updateCommitTx(ctx)
×
UNCOV
1058
        }
×
1059

1060
        return nil
213✔
1061
}
1062

1063
// resolveFwdPkg interprets the FwdState of the provided package, either
1064
// reprocesses any outstanding htlcs in the package, or performs garbage
1065
// collection on the package.
1066
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
6✔
1067
        // Remove any completed packages to clear up space.
6✔
1068
        if fwdPkg.State == channeldb.FwdStateCompleted {
7✔
1069
                l.log.Debugf("removing completed fwd pkg for height=%d",
1✔
1070
                        fwdPkg.Height)
1✔
1071

1✔
1072
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
1✔
1073
                if err != nil {
1✔
1074
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
1075
                                "%v", fwdPkg.Height, err)
×
1076
                        return err
×
1077
                }
×
1078
        }
1079

1080
        // Otherwise this is either a new package or one has gone through
1081
        // processing, but contains htlcs that need to be restored in memory.
1082
        // We replay this forwarding package to make sure our local mem state
1083
        // is resurrected, we mimic any original responses back to the remote
1084
        // party, and re-forward the relevant HTLCs to the switch.
1085

1086
        // If the package is fully acked but not completed, it must still have
1087
        // settles and fails to propagate.
1088
        if !fwdPkg.SettleFailFilter.IsFull() {
6✔
UNCOV
1089
                l.processRemoteSettleFails(fwdPkg)
×
UNCOV
1090
        }
×
1091

1092
        // Finally, replay *ALL ADDS* in this forwarding package. The
1093
        // downstream logic is able to filter out any duplicates, but we must
1094
        // shove the entire, original set of adds down the pipeline so that the
1095
        // batch of adds presented to the sphinx router does not ever change.
1096
        if !fwdPkg.AckFilter.IsFull() {
9✔
1097
                l.processRemoteAdds(fwdPkg)
3✔
1098

3✔
1099
                // If the link failed during processing the adds, we must
3✔
1100
                // return to ensure we won't attempted to update the state
3✔
1101
                // further.
3✔
1102
                if l.failed {
3✔
1103
                        return fmt.Errorf("link failed while " +
×
1104
                                "processing remote adds")
×
1105
                }
×
1106
        }
1107

1108
        return nil
6✔
1109
}
1110

1111
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1112
// removes those that can be discarded. It is safe to do this entirely in the
1113
// background, since all state is coordinated on disk. This also ensures the
1114
// link can continue to process messages and interleave database accesses.
1115
//
1116
// NOTE: This MUST be run as a goroutine.
1117
func (l *channelLink) fwdPkgGarbager() {
213✔
1118
        defer l.cg.WgDone()
213✔
1119

213✔
1120
        l.cfg.FwdPkgGCTicker.Resume()
213✔
1121
        defer l.cfg.FwdPkgGCTicker.Stop()
213✔
1122

213✔
1123
        if err := l.loadAndRemove(); err != nil {
213✔
1124
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
1125
        }
×
1126

1127
        for {
442✔
1128
                select {
229✔
1129
                case <-l.cfg.FwdPkgGCTicker.Ticks():
16✔
1130
                        if err := l.loadAndRemove(); err != nil {
32✔
1131
                                l.log.Warnf("unable to remove fwd pkgs: %v",
16✔
1132
                                        err)
16✔
1133
                                continue
16✔
1134
                        }
1135
                case <-l.cg.Done():
202✔
1136
                        return
202✔
1137
                }
1138
        }
1139
}
1140

1141
// loadAndRemove loads all the channels forwarding packages and determines if
1142
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1143
// a longer tick interval can be used.
1144
func (l *channelLink) loadAndRemove() error {
229✔
1145
        fwdPkgs, err := l.channel.LoadFwdPkgs()
229✔
1146
        if err != nil {
245✔
1147
                return err
16✔
1148
        }
16✔
1149

1150
        var removeHeights []uint64
213✔
1151
        for _, fwdPkg := range fwdPkgs {
218✔
1152
                if fwdPkg.State != channeldb.FwdStateCompleted {
10✔
1153
                        continue
5✔
1154
                }
1155

UNCOV
1156
                removeHeights = append(removeHeights, fwdPkg.Height)
×
1157
        }
1158

1159
        // If removeHeights is empty, return early so we don't use a db
1160
        // transaction.
1161
        if len(removeHeights) == 0 {
426✔
1162
                return nil
213✔
1163
        }
213✔
1164

UNCOV
1165
        return l.channel.RemoveFwdPkgs(removeHeights...)
×
1166
}
1167

1168
// handleChanSyncErr performs the error handling logic in the case where we
1169
// could not successfully syncChanStates with our channel peer.
UNCOV
1170
func (l *channelLink) handleChanSyncErr(err error) {
×
UNCOV
1171
        l.log.Warnf("error when syncing channel states: %v", err)
×
UNCOV
1172

×
UNCOV
1173
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
×
UNCOV
1174

×
UNCOV
1175
        switch {
×
UNCOV
1176
        case errors.Is(err, ErrLinkShuttingDown):
×
UNCOV
1177
                l.log.Debugf("unable to sync channel states, link is " +
×
UNCOV
1178
                        "shutting down")
×
UNCOV
1179
                return
×
1180

1181
        // We failed syncing the commit chains, probably because the remote has
1182
        // lost state. We should force close the channel.
UNCOV
1183
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
×
UNCOV
1184
                fallthrough
×
1185

1186
        // The remote sent us an invalid last commit secret, we should force
1187
        // close the channel.
1188
        // TODO(halseth): and permanently ban the peer?
UNCOV
1189
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
×
UNCOV
1190
                fallthrough
×
1191

1192
        // The remote sent us a commit point different from what they sent us
1193
        // before.
1194
        // TODO(halseth): ban peer?
UNCOV
1195
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
×
UNCOV
1196
                // We'll fail the link and tell the peer to force close the
×
UNCOV
1197
                // channel. Note that the database state is not updated here,
×
UNCOV
1198
                // but will be updated when the close transaction is ready to
×
UNCOV
1199
                // avoid that we go down before storing the transaction in the
×
UNCOV
1200
                // db.
×
UNCOV
1201
                l.failf(
×
UNCOV
1202
                        LinkFailureError{
×
UNCOV
1203
                                code:          ErrSyncError,
×
UNCOV
1204
                                FailureAction: LinkFailureForceClose,
×
UNCOV
1205
                        },
×
UNCOV
1206
                        "unable to synchronize channel states: %v", err,
×
UNCOV
1207
                )
×
1208

1209
        // We have lost state and cannot safely force close the channel. Fail
1210
        // the channel and wait for the remote to hopefully force close it. The
1211
        // remote has sent us its latest unrevoked commitment point, and we'll
1212
        // store it in the database, such that we can attempt to recover the
1213
        // funds if the remote force closes the channel.
UNCOV
1214
        case errors.As(err, &errDataLoss):
×
UNCOV
1215
                err := l.channel.MarkDataLoss(
×
UNCOV
1216
                        errDataLoss.CommitPoint,
×
UNCOV
1217
                )
×
UNCOV
1218
                if err != nil {
×
1219
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1220
                                err)
×
1221
                }
×
1222

1223
        // We determined the commit chains were not possible to sync. We
1224
        // cautiously fail the channel, but don't force close.
1225
        // TODO(halseth): can we safely force close in any cases where this
1226
        // error is returned?
1227
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1228
                if err := l.channel.MarkBorked(); err != nil {
×
1229
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1230
                }
×
1231

1232
        // Other, unspecified error.
1233
        default:
×
1234
        }
1235

UNCOV
1236
        l.failf(
×
UNCOV
1237
                LinkFailureError{
×
UNCOV
1238
                        code:          ErrRecoveryError,
×
UNCOV
1239
                        FailureAction: LinkFailureForceNone,
×
UNCOV
1240
                },
×
UNCOV
1241
                "unable to synchronize channel states: %v", err,
×
UNCOV
1242
        )
×
1243
}
1244

1245
// htlcManager is the primary goroutine which drives a channel's commitment
1246
// update state-machine in response to messages received via several channels.
1247
// This goroutine reads messages from the upstream (remote) peer, and also from
1248
// downstream channel managed by the channel link. In the event that an htlc
1249
// needs to be forwarded, then send-only forward handler is used which sends
1250
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1251
// all timeouts for any active HTLCs, manages the channel's revocation window,
1252
// and also the htlc trickle queue+timer for this active channels.
1253
//
1254
// NOTE: This MUST be run as a goroutine.
1255
func (l *channelLink) htlcManager(ctx context.Context) {
213✔
1256
        defer func() {
416✔
1257
                l.cfg.BatchTicker.Stop()
203✔
1258
                l.cg.WgDone()
203✔
1259
                l.log.Infof("exited")
203✔
1260
        }()
203✔
1261

1262
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
213✔
1263

213✔
1264
        // Notify any clients that the link is now in the switch via an
213✔
1265
        // ActiveLinkEvent. We'll also defer an inactive link notification for
213✔
1266
        // when the link exits to ensure that every active notification is
213✔
1267
        // matched by an inactive one.
213✔
1268
        l.cfg.NotifyActiveLink(l.ChannelPoint())
213✔
1269
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
213✔
1270

213✔
1271
        // If the link is not started for the first time, we need to take extra
213✔
1272
        // steps to resume its state.
213✔
1273
        err := l.resumeLink(ctx)
213✔
1274
        if err != nil {
213✔
UNCOV
1275
                l.log.Errorf("resuming link failed: %v", err)
×
UNCOV
1276
                return
×
UNCOV
1277
        }
×
1278

1279
        // Now that we've received both channel_ready and channel reestablish,
1280
        // we can go ahead and send the active channel notification. We'll also
1281
        // defer the inactive notification for when the link exits to ensure
1282
        // that every active notification is matched by an inactive one.
1283
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
213✔
1284
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
213✔
1285

213✔
1286
        for {
4,376✔
1287
                // We must always check if we failed at some point processing
4,163✔
1288
                // the last update before processing the next.
4,163✔
1289
                if l.failed {
4,175✔
1290
                        l.log.Errorf("link failed, exiting htlcManager")
12✔
1291
                        return
12✔
1292
                }
12✔
1293

1294
                // Pause or resume the batch ticker.
1295
                l.toggleBatchTicker()
4,151✔
1296

4,151✔
1297
                select {
4,151✔
1298
                // We have a new hook that needs to be run when we reach a clean
1299
                // channel state.
1300
                case hook := <-l.flushHooks.newTransients:
1✔
1301
                        if l.channel.IsChannelClean() {
1✔
UNCOV
1302
                                hook()
×
1303
                        } else {
1✔
1304
                                l.flushHooks.alloc(hook)
1✔
1305
                        }
1✔
1306

1307
                // We have a new hook that needs to be run when we have
1308
                // committed all of our updates.
1309
                case hook := <-l.outgoingCommitHooks.newTransients:
1✔
1310
                        if !l.channel.OweCommitment() {
1✔
UNCOV
1311
                                hook()
×
1312
                        } else {
1✔
1313
                                l.outgoingCommitHooks.alloc(hook)
1✔
1314
                        }
1✔
1315

1316
                // We have a new hook that needs to be run when our peer has
1317
                // committed all of their updates.
1318
                case hook := <-l.incomingCommitHooks.newTransients:
×
1319
                        if !l.channel.NeedCommitment() {
×
1320
                                hook()
×
1321
                        } else {
×
1322
                                l.incomingCommitHooks.alloc(hook)
×
1323
                        }
×
1324

1325
                // Our update fee timer has fired, so we'll check the network
1326
                // fee to see if we should adjust our commitment fee.
1327
                case <-l.updateFeeTimer.C:
4✔
1328
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
4✔
1329
                        err := l.handleUpdateFee(ctx)
4✔
1330
                        if err != nil {
4✔
1331
                                l.log.Errorf("failed to handle update fee: "+
×
1332
                                        "%v", err)
×
1333
                        }
×
1334

1335
                // The underlying channel has notified us of a unilateral close
1336
                // carried out by the remote peer. In the case of such an
1337
                // event, we'll wipe the channel state from the peer, and mark
1338
                // the contract as fully settled. Afterwards we can exit.
1339
                //
1340
                // TODO(roasbeef): add force closure? also breach?
UNCOV
1341
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
×
UNCOV
1342
                        l.log.Warnf("remote peer has closed on-chain")
×
UNCOV
1343

×
UNCOV
1344
                        // TODO(roasbeef): remove all together
×
UNCOV
1345
                        go func() {
×
UNCOV
1346
                                chanPoint := l.channel.ChannelPoint()
×
UNCOV
1347
                                l.cfg.Peer.WipeChannel(&chanPoint)
×
UNCOV
1348
                        }()
×
1349

UNCOV
1350
                        return
×
1351

1352
                case <-l.cfg.BatchTicker.Ticks():
197✔
1353
                        // Attempt to extend the remote commitment chain
197✔
1354
                        // including all the currently pending entries. If the
197✔
1355
                        // send was unsuccessful, then abandon the update,
197✔
1356
                        // waiting for the revocation window to open up.
197✔
1357
                        if !l.updateCommitTxOrFail(ctx) {
197✔
1358
                                return
×
1359
                        }
×
1360

1361
                case <-l.cfg.PendingCommitTicker.Ticks():
1✔
1362
                        l.failf(
1✔
1363
                                LinkFailureError{
1✔
1364
                                        code:          ErrRemoteUnresponsive,
1✔
1365
                                        FailureAction: LinkFailureDisconnect,
1✔
1366
                                },
1✔
1367
                                "unable to complete dance",
1✔
1368
                        )
1✔
1369
                        return
1✔
1370

1371
                // A message from the switch was just received. This indicates
1372
                // that the link is an intermediate hop in a multi-hop HTLC
1373
                // circuit.
1374
                case pkt := <-l.downstream:
521✔
1375
                        l.handleDownstreamPkt(ctx, pkt)
521✔
1376

1377
                // A message from the connected peer was just received. This
1378
                // indicates that we have a new incoming HTLC, either directly
1379
                // for us, or part of a multi-hop HTLC circuit.
1380
                case msg := <-l.upstream:
3,170✔
1381
                        l.handleUpstreamMsg(ctx, msg)
3,170✔
1382

1383
                // A htlc resolution is received. This means that we now have a
1384
                // resolution for a previously accepted htlc.
1385
                case hodlItem := <-l.hodlQueue.ChanOut():
55✔
1386
                        err := l.handleHtlcResolution(ctx, hodlItem)
55✔
1387
                        if err != nil {
56✔
1388
                                l.log.Errorf("failed to handle htlc "+
1✔
1389
                                        "resolution: %v", err)
1✔
1390
                        }
1✔
1391

1392
                // A user-initiated quiescence request is received. We now
1393
                // forward it to the quiescer.
1394
                case qReq := <-l.quiescenceReqs:
1✔
1395
                        err := l.handleQuiescenceReq(qReq)
1✔
1396
                        if err != nil {
1✔
1397
                                l.log.Errorf("failed handle quiescence "+
×
1398
                                        "req: %v", err)
×
1399
                        }
×
1400

1401
                case <-l.cg.Done():
190✔
1402
                        return
190✔
1403
                }
1404
        }
1405
}
1406

1407
// processHodlQueue processes a received htlc resolution and continues reading
1408
// from the hodl queue until no more resolutions remain. When this function
1409
// returns without an error, the commit tx should be updated.
1410
func (l *channelLink) processHodlQueue(ctx context.Context,
1411
        firstResolution invoices.HtlcResolution) error {
55✔
1412

55✔
1413
        // Try to read all waiting resolution messages, so that they can all be
55✔
1414
        // processed in a single commitment tx update.
55✔
1415
        htlcResolution := firstResolution
55✔
1416
loop:
55✔
1417
        for {
110✔
1418
                // Lookup all hodl htlcs that can be failed or settled with this event.
55✔
1419
                // The hodl htlc must be present in the map.
55✔
1420
                circuitKey := htlcResolution.CircuitKey()
55✔
1421
                hodlHtlc, ok := l.hodlMap[circuitKey]
55✔
1422
                if !ok {
55✔
1423
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1424
                }
×
1425

1426
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
55✔
1427
                        return err
×
1428
                }
×
1429

1430
                // Clean up hodl map.
1431
                delete(l.hodlMap, circuitKey)
55✔
1432

55✔
1433
                select {
55✔
UNCOV
1434
                case item := <-l.hodlQueue.ChanOut():
×
UNCOV
1435
                        htlcResolution = item.(invoices.HtlcResolution)
×
1436

1437
                // No need to process it if the link is broken.
1438
                case <-l.cg.Done():
×
1439
                        return ErrLinkShuttingDown
×
1440

1441
                default:
55✔
1442
                        break loop
55✔
1443
                }
1444
        }
1445

1446
        // Update the commitment tx.
1447
        if err := l.updateCommitTx(ctx); err != nil {
56✔
1448
                return err
1✔
1449
        }
1✔
1450

1451
        return nil
54✔
1452
}
1453

1454
// processHtlcResolution applies a received htlc resolution to the provided
1455
// htlc. When this function returns without an error, the commit tx should be
1456
// updated.
1457
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1458
        htlc hodlHtlc) error {
201✔
1459

201✔
1460
        circuitKey := resolution.CircuitKey()
201✔
1461

201✔
1462
        // Determine required action for the resolution based on the type of
201✔
1463
        // resolution we have received.
201✔
1464
        switch res := resolution.(type) {
201✔
1465
        // Settle htlcs that returned a settle resolution using the preimage
1466
        // in the resolution.
1467
        case *invoices.HtlcSettleResolution:
197✔
1468
                l.log.Debugf("received settle resolution for %v "+
197✔
1469
                        "with outcome: %v", circuitKey, res.Outcome)
197✔
1470

197✔
1471
                return l.settleHTLC(
197✔
1472
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
197✔
1473
                )
197✔
1474

1475
        // For htlc failures, we get the relevant failure message based
1476
        // on the failure resolution and then fail the htlc.
1477
        case *invoices.HtlcFailResolution:
4✔
1478
                l.log.Debugf("received cancel resolution for "+
4✔
1479
                        "%v with outcome: %v", circuitKey, res.Outcome)
4✔
1480

4✔
1481
                // Get the lnwire failure message based on the resolution
4✔
1482
                // result.
4✔
1483
                failure := getResolutionFailure(res, htlc.add.Amount)
4✔
1484

4✔
1485
                l.sendHTLCError(
4✔
1486
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
4✔
1487
                        true,
4✔
1488
                )
4✔
1489
                return nil
4✔
1490

1491
        // Fail if we do not get a settle of fail resolution, since we
1492
        // are only expecting to handle settles and fails.
1493
        default:
×
1494
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1495
                        resolution)
×
1496
        }
1497
}
1498

1499
// getResolutionFailure returns the wire message that a htlc resolution should
1500
// be failed with.
1501
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1502
        amount lnwire.MilliSatoshi) *LinkError {
4✔
1503

4✔
1504
        // If the resolution has been resolved as part of a MPP timeout,
4✔
1505
        // we need to fail the htlc with lnwire.FailMppTimeout.
4✔
1506
        if resolution.Outcome == invoices.ResultMppTimeout {
4✔
1507
                return NewDetailedLinkError(
×
1508
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1509
                )
×
1510
        }
×
1511

1512
        // If the htlc is not a MPP timeout, we fail it with
1513
        // FailIncorrectDetails. This error is sent for invoice payment
1514
        // failures such as underpayment/ expiry too soon and hodl invoices
1515
        // (which return FailIncorrectDetails to avoid leaking information).
1516
        incorrectDetails := lnwire.NewFailIncorrectDetails(
4✔
1517
                amount, uint32(resolution.AcceptHeight),
4✔
1518
        )
4✔
1519

4✔
1520
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
4✔
1521
}
1522

1523
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1524
// within the link's configuration that will be used to determine when the link
1525
// should propose an update to its commitment fee rate.
1526
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
217✔
1527
        lower := int64(l.cfg.MinUpdateTimeout)
217✔
1528
        upper := int64(l.cfg.MaxUpdateTimeout)
217✔
1529
        return time.Duration(prand.Int63n(upper-lower) + lower)
217✔
1530
}
217✔
1531

1532
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1533
// downstream HTLC Switch.
1534
func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context,
1535
        pkt *htlcPacket) error {
480✔
1536

480✔
1537
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
480✔
1538
        if !ok {
480✔
1539
                return errors.New("not an UpdateAddHTLC packet")
×
1540
        }
×
1541

1542
        // If we are flushing the link in the outgoing direction or we have
1543
        // already sent Stfu, then we can't add new htlcs to the link and we
1544
        // need to bounce it.
1545
        if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
480✔
1546
                l.mailBox.FailAdd(pkt)
×
1547

×
1548
                return NewDetailedLinkError(
×
1549
                        &lnwire.FailTemporaryChannelFailure{},
×
1550
                        OutgoingFailureLinkNotEligible,
×
1551
                )
×
1552
        }
×
1553

1554
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1555
        // arbitrary delays between the switch adding an ADD to the
1556
        // mailbox, and the HTLC being added to the commitment state.
1557
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
480✔
1558
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1559
                l.mailBox.AckPacket(pkt.inKey())
×
1560
                return nil
×
1561
        }
×
1562

1563
        // Check if we can add the HTLC here without exceededing the max fee
1564
        // exposure threshold.
1565
        if l.isOverexposedWithHtlc(htlc, false) {
484✔
1566
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
4✔
1567
                        "exposure exceeded")
4✔
1568

4✔
1569
                l.mailBox.FailAdd(pkt)
4✔
1570

4✔
1571
                return NewDetailedLinkError(
4✔
1572
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1573
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1574
                )
4✔
1575
        }
4✔
1576

1577
        // A new payment has been initiated via the downstream channel,
1578
        // so we add the new HTLC to our local log, then update the
1579
        // commitment chains.
1580
        htlc.ChanID = l.ChanID()
476✔
1581
        openCircuitRef := pkt.inKey()
476✔
1582

476✔
1583
        // We enforce the fee buffer for the commitment transaction because
476✔
1584
        // we are in control of adding this htlc. Nothing has locked-in yet so
476✔
1585
        // we can securely enforce the fee buffer which is only relevant if we
476✔
1586
        // are the initiator of the channel.
476✔
1587
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
476✔
1588
        if err != nil {
477✔
1589
                // The HTLC was unable to be added to the state machine,
1✔
1590
                // as a result, we'll signal the switch to cancel the
1✔
1591
                // pending payment.
1✔
1592
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
1✔
1593
                        err)
1✔
1594

1✔
1595
                // Remove this packet from the link's mailbox, this
1✔
1596
                // prevents it from being reprocessed if the link
1✔
1597
                // restarts and resets it mailbox. If this response
1✔
1598
                // doesn't make it back to the originating link, it will
1✔
1599
                // be rejected upon attempting to reforward the Add to
1✔
1600
                // the switch, since the circuit was never fully opened,
1✔
1601
                // and the forwarding package shows it as
1✔
1602
                // unacknowledged.
1✔
1603
                l.mailBox.FailAdd(pkt)
1✔
1604

1✔
1605
                return NewDetailedLinkError(
1✔
1606
                        lnwire.NewTemporaryChannelFailure(nil),
1✔
1607
                        OutgoingFailureDownstreamHtlcAdd,
1✔
1608
                )
1✔
1609
        }
1✔
1610

1611
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
475✔
1612
                "local_log_index=%v, pend_updates=%v",
475✔
1613
                htlc.PaymentHash[:], index,
475✔
1614
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
475✔
1615

475✔
1616
        pkt.outgoingChanID = l.ShortChanID()
475✔
1617
        pkt.outgoingHTLCID = index
475✔
1618
        htlc.ID = index
475✔
1619

475✔
1620
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
475✔
1621
                pkt.inKey(), pkt.outKey())
475✔
1622

475✔
1623
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
475✔
1624
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
475✔
1625

475✔
1626
        err = l.cfg.Peer.SendMessage(false, htlc)
475✔
1627
        if err != nil {
475✔
1628
                l.log.Errorf("failed to send UpdateAddHTLC: %v", err)
×
1629
        }
×
1630

1631
        // Send a forward event notification to htlcNotifier.
1632
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
475✔
1633
                newHtlcKey(pkt),
475✔
1634
                HtlcInfo{
475✔
1635
                        IncomingTimeLock: pkt.incomingTimeout,
475✔
1636
                        IncomingAmt:      pkt.incomingAmount,
475✔
1637
                        OutgoingTimeLock: htlc.Expiry,
475✔
1638
                        OutgoingAmt:      htlc.Amount,
475✔
1639
                },
475✔
1640
                getEventType(pkt),
475✔
1641
        )
475✔
1642

475✔
1643
        l.tryBatchUpdateCommitTx(ctx)
475✔
1644

475✔
1645
        return nil
475✔
1646
}
1647

1648
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1649
// Switch. Possible messages sent by the switch include requests to forward new
1650
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1651
// cleared HTLCs with the upstream peer.
1652
//
1653
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1654
func (l *channelLink) handleDownstreamPkt(ctx context.Context,
1655
        pkt *htlcPacket) {
521✔
1656

521✔
1657
        if pkt.htlc.MsgType().IsChannelUpdate() &&
521✔
1658
                !l.quiescer.CanSendUpdates() {
521✔
1659

×
1660
                l.log.Warnf("unable to process channel update. "+
×
1661
                        "ChannelID=%v is quiescent.", l.ChanID)
×
1662

×
1663
                return
×
1664
        }
×
1665

1666
        switch htlc := pkt.htlc.(type) {
521✔
1667
        case *lnwire.UpdateAddHTLC:
480✔
1668
                // Handle add message. The returned error can be ignored,
480✔
1669
                // because it is also sent through the mailbox.
480✔
1670
                _ = l.handleDownstreamUpdateAdd(ctx, pkt)
480✔
1671

1672
        case *lnwire.UpdateFulfillHTLC:
23✔
1673
                l.processLocalUpdateFulfillHTLC(ctx, pkt, htlc)
23✔
1674

1675
        case *lnwire.UpdateFailHTLC:
18✔
1676
                l.processLocalUpdateFailHTLC(ctx, pkt, htlc)
18✔
1677
        }
1678
}
1679

1680
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1681
// full.
1682
func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) {
475✔
1683
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
475✔
1684
        if pending < uint64(l.cfg.BatchSize) {
923✔
1685
                return
448✔
1686
        }
448✔
1687

1688
        l.updateCommitTxOrFail(ctx)
27✔
1689
}
1690

1691
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1692
// associated with this packet. If successful in doing so, it will also purge
1693
// the open circuit from the circuit map and remove the packet from the link's
1694
// mailbox.
1695
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
2✔
1696
        inKey := pkt.inKey()
2✔
1697

2✔
1698
        l.log.Debugf("cleaning up spurious response for incoming "+
2✔
1699
                "circuit-key=%v", inKey)
2✔
1700

2✔
1701
        // If the htlc packet doesn't have a source reference, it is unsafe to
2✔
1702
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
2✔
1703
        if pkt.sourceRef == nil {
3✔
1704
                l.log.Errorf("unable to cleanup response for incoming "+
1✔
1705
                        "circuit-key=%v, does not contain source reference",
1✔
1706
                        inKey)
1✔
1707
                return
1✔
1708
        }
1✔
1709

1710
        // If the source reference is present,  we will try to prevent this link
1711
        // from resending the packet to the switch. To do so, we ack the AddRef
1712
        // of the incoming HTLC belonging to this link.
1713
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1✔
1714
        if err != nil {
1✔
1715
                l.log.Errorf("unable to ack AddRef for incoming "+
×
1716
                        "circuit-key=%v: %v", inKey, err)
×
1717

×
1718
                // If this operation failed, it is unsafe to attempt removal of
×
1719
                // the destination reference or circuit, so we exit early. The
×
1720
                // cleanup may proceed with a different packet in the future
×
1721
                // that succeeds on this step.
×
1722
                return
×
1723
        }
×
1724

1725
        // Now that we know this link will stop retransmitting Adds to the
1726
        // switch, we can begin to teardown the response reference and circuit
1727
        // map.
1728
        //
1729
        // If the packet includes a destination reference, then a response for
1730
        // this HTLC was locked into the outgoing channel. Attempt to remove
1731
        // this reference, so we stop retransmitting the response internally.
1732
        // Even if this fails, we will proceed in trying to delete the circuit.
1733
        // When retransmitting responses, the destination references will be
1734
        // cleaned up if an open circuit is not found in the circuit map.
1735
        if pkt.destRef != nil {
1✔
1736
                err := l.channel.AckSettleFails(*pkt.destRef)
×
1737
                if err != nil {
×
1738
                        l.log.Errorf("unable to ack SettleFailRef "+
×
1739
                                "for incoming circuit-key=%v: %v",
×
1740
                                inKey, err)
×
1741
                }
×
1742
        }
1743

1744
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1✔
1745

1✔
1746
        // With all known references acked, we can now safely delete the circuit
1✔
1747
        // from the switch's circuit map, as the state is no longer needed.
1✔
1748
        err = l.cfg.Circuits.DeleteCircuits(inKey)
1✔
1749
        if err != nil {
1✔
1750
                l.log.Errorf("unable to delete circuit for "+
×
1751
                        "circuit-key=%v: %v", inKey, err)
×
1752
        }
×
1753
}
1754

1755
// handleUpstreamMsg processes wire messages related to commitment state
1756
// updates from the upstream peer. The upstream peer is the peer whom we have a
1757
// direct channel with, updating our respective commitment chains.
1758
func (l *channelLink) handleUpstreamMsg(ctx context.Context,
1759
        msg lnwire.Message) {
3,170✔
1760

3,170✔
1761
        l.log.Tracef("receive upstream msg %v, handling now... ", msg.MsgType())
3,170✔
1762
        defer l.log.Tracef("handled upstream msg %v", msg.MsgType())
3,170✔
1763

3,170✔
1764
        // First check if the message is an update and we are capable of
3,170✔
1765
        // receiving updates right now.
3,170✔
1766
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
3,170✔
1767
                l.stfuFailf("update received after stfu: %T", msg)
×
1768
                return
×
1769
        }
×
1770

1771
        var err error
3,170✔
1772

3,170✔
1773
        switch msg := msg.(type) {
3,170✔
1774
        case *lnwire.UpdateAddHTLC:
450✔
1775
                err = l.processRemoteUpdateAddHTLC(msg)
450✔
1776

1777
        case *lnwire.UpdateFulfillHTLC:
227✔
1778
                err = l.processRemoteUpdateFulfillHTLC(msg)
227✔
1779

1780
        case *lnwire.UpdateFailMalformedHTLC:
3✔
1781
                err = l.processRemoteUpdateFailMalformedHTLC(msg)
3✔
1782

1783
        case *lnwire.UpdateFailHTLC:
120✔
1784
                err = l.processRemoteUpdateFailHTLC(msg)
120✔
1785

1786
        case *lnwire.CommitSig:
1,188✔
1787
                err = l.processRemoteCommitSig(ctx, msg)
1,188✔
1788

1789
        case *lnwire.RevokeAndAck:
1,176✔
1790
                err = l.processRemoteRevokeAndAck(ctx, msg)
1,176✔
1791

1792
        case *lnwire.UpdateFee:
3✔
1793
                err = l.processRemoteUpdateFee(msg)
3✔
1794

1795
        case *lnwire.Stfu:
2✔
1796
                err = l.handleStfu(msg)
2✔
1797
                if err != nil {
2✔
1798
                        l.stfuFailf("handleStfu: %v", err.Error())
×
1799
                }
×
1800

1801
        // In the case where we receive a warning message from our peer, just
1802
        // log it and move on. We choose not to disconnect from our peer,
1803
        // although we "MAY" do so according to the specification.
1804
        case *lnwire.Warning:
1✔
1805
                l.log.Warnf("received warning message from peer: %v",
1✔
1806
                        msg.Warning())
1✔
1807

UNCOV
1808
        case *lnwire.Error:
×
UNCOV
1809
                l.processRemoteError(msg)
×
1810

1811
        default:
×
1812
                l.log.Warnf("received unknown message of type %T", msg)
×
1813
        }
1814

1815
        if err != nil {
3,172✔
1816
                l.log.Errorf("failed to process remote %v: %v", msg.MsgType(),
2✔
1817
                        err)
2✔
1818
        }
2✔
1819
}
1820

1821
// handleStfu implements the top-level logic for handling the Stfu message from
1822
// our peer.
1823
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
2✔
1824
        if !l.noDanglingUpdates(lntypes.Remote) {
2✔
1825
                return ErrPendingRemoteUpdates
×
1826
        }
×
1827
        err := l.quiescer.RecvStfu(*stfu)
2✔
1828
        if err != nil {
2✔
1829
                return err
×
1830
        }
×
1831

1832
        // If we can immediately send an Stfu response back, we will.
1833
        if l.noDanglingUpdates(lntypes.Local) {
3✔
1834
                return l.quiescer.SendOwedStfu()
1✔
1835
        }
1✔
1836

1837
        return nil
1✔
1838
}
1839

1840
// stfuFailf fails the link in the case where the requirements of the quiescence
1841
// protocol are violated. In all cases we opt to drop the connection as only
1842
// link state (as opposed to channel state) is affected.
1843
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
1844
        l.failf(LinkFailureError{
×
1845
                code:             ErrStfuViolation,
×
1846
                FailureAction:    LinkFailureDisconnect,
×
1847
                PermanentFailure: false,
×
1848
                Warning:          true,
×
1849
        }, format, args...)
×
1850
}
×
1851

1852
// noDanglingUpdates returns true when there are 0 updates that were originally
1853
// issued by whose on either the Local or Remote commitment transaction.
1854
func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
1,193✔
1855
        pendingOnLocal := l.channel.NumPendingUpdates(
1,193✔
1856
                whose, lntypes.Local,
1,193✔
1857
        )
1,193✔
1858
        pendingOnRemote := l.channel.NumPendingUpdates(
1,193✔
1859
                whose, lntypes.Remote,
1,193✔
1860
        )
1,193✔
1861

1,193✔
1862
        return pendingOnLocal == 0 && pendingOnRemote == 0
1,193✔
1863
}
1,193✔
1864

1865
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
1866
// for packets delivered from server, and cleaning up any circuits closed by
1867
// signing a previous commitment txn. This method ensures that the circuits are
1868
// removed from the circuit map before removing them from the link's mailbox,
1869
// otherwise it could be possible for some circuit to be missed if this link
1870
// flaps.
1871
func (l *channelLink) ackDownStreamPackets() error {
1,367✔
1872
        // First, remove the downstream Add packets that were included in the
1,367✔
1873
        // previous commitment signature. This will prevent the Adds from being
1,367✔
1874
        // replayed if this link disconnects.
1,367✔
1875
        for _, inKey := range l.openedCircuits {
1,831✔
1876
                // In order to test the sphinx replay logic of the remote
464✔
1877
                // party, unsafe replay does not acknowledge the packets from
464✔
1878
                // the mailbox. We can then force a replay of any Add packets
464✔
1879
                // held in memory by disconnecting and reconnecting the link.
464✔
1880
                if l.cfg.UnsafeReplay {
464✔
UNCOV
1881
                        continue
×
1882
                }
1883

1884
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
464✔
1885
                l.mailBox.AckPacket(inKey)
464✔
1886
        }
1887

1888
        // Now, we will delete all circuits closed by the previous commitment
1889
        // signature, which is the result of downstream Settle/Fail packets. We
1890
        // batch them here to ensure circuits are closed atomically and for
1891
        // performance.
1892
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
1,367✔
1893
        switch err {
1,367✔
1894
        case nil:
1,367✔
1895
                // Successful deletion.
1896

1897
        default:
×
1898
                l.log.Errorf("unable to delete %d circuits: %v",
×
1899
                        len(l.closedCircuits), err)
×
1900
                return err
×
1901
        }
1902

1903
        // With the circuits removed from memory and disk, we now ack any
1904
        // Settle/Fails in the mailbox to ensure they do not get redelivered
1905
        // after startup. If forgive is enabled and we've reached this point,
1906
        // the circuits must have been removed at some point, so it is now safe
1907
        // to un-queue the corresponding Settle/Fails.
1908
        for _, inKey := range l.closedCircuits {
1,406✔
1909
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
39✔
1910
                        inKey)
39✔
1911
                l.mailBox.AckPacket(inKey)
39✔
1912
        }
39✔
1913

1914
        // Lastly, reset our buffers to be empty while keeping any acquired
1915
        // growth in the backing array.
1916
        l.openedCircuits = l.openedCircuits[:0]
1,367✔
1917
        l.closedCircuits = l.closedCircuits[:0]
1,367✔
1918

1,367✔
1919
        return nil
1,367✔
1920
}
1921

1922
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
1923
// the link.
1924
func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool {
1,225✔
1925
        err := l.updateCommitTx(ctx)
1,225✔
1926
        switch {
1,225✔
1927
        // No error encountered, success.
1928
        case err == nil:
1,216✔
1929

1930
        // A duplicate keystone error should be resolved and is not fatal, so
1931
        // we won't send an Error message to the peer.
1932
        case errors.Is(err, ErrDuplicateKeystone):
×
1933
                l.failf(LinkFailureError{code: ErrCircuitError},
×
1934
                        "temporary circuit error: %v", err)
×
1935
                return false
×
1936

1937
        // Any other error is treated results in an Error message being sent to
1938
        // the peer.
1939
        default:
9✔
1940
                l.failf(LinkFailureError{code: ErrInternalError},
9✔
1941
                        "unable to update commitment: %v", err)
9✔
1942
                return false
9✔
1943
        }
1944

1945
        return true
1,216✔
1946
}
1947

1948
// updateCommitTx signs, then sends an update to the remote peer adding a new
1949
// commitment to their commitment chain which includes all the latest updates
1950
// we've received+processed up to this point.
1951
func (l *channelLink) updateCommitTx(ctx context.Context) error {
1,283✔
1952
        // Preemptively write all pending keystones to disk, just in case the
1,283✔
1953
        // HTLCs we have in memory are included in the subsequent attempt to
1,283✔
1954
        // sign a commitment state.
1,283✔
1955
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
1,283✔
1956
        if err != nil {
1,283✔
1957
                // If ErrDuplicateKeystone is returned, the caller will catch
×
1958
                // it.
×
1959
                return err
×
1960
        }
×
1961

1962
        // Reset the batch, but keep the backing buffer to avoid reallocating.
1963
        l.keystoneBatch = l.keystoneBatch[:0]
1,283✔
1964

1,283✔
1965
        // If hodl.Commit mode is active, we will refrain from attempting to
1,283✔
1966
        // commit any in-memory modifications to the channel state. Exiting here
1,283✔
1967
        // permits testing of either the switch or link's ability to trim
1,283✔
1968
        // circuits that have been opened, but unsuccessfully committed.
1,283✔
1969
        if l.cfg.HodlMask.Active(hodl.Commit) {
1,287✔
1970
                l.log.Warnf(hodl.Commit.Warning())
4✔
1971
                return nil
4✔
1972
        }
4✔
1973

1974
        ctx, done := l.cg.Create(ctx)
1,279✔
1975
        defer done()
1,279✔
1976

1,279✔
1977
        newCommit, err := l.channel.SignNextCommitment(ctx)
1,279✔
1978
        if err == lnwallet.ErrNoWindow {
1,361✔
1979
                l.cfg.PendingCommitTicker.Resume()
82✔
1980
                l.log.Trace("PendingCommitTicker resumed")
82✔
1981

82✔
1982
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
82✔
1983
                l.log.Tracef("revocation window exhausted, unable to send: "+
82✔
1984
                        "%v, pend_updates=%v, dangling_closes%v", n,
82✔
1985
                        lnutils.SpewLogClosure(l.openedCircuits),
82✔
1986
                        lnutils.SpewLogClosure(l.closedCircuits))
82✔
1987

82✔
1988
                return nil
82✔
1989
        } else if err != nil {
1,279✔
1990
                return err
×
1991
        }
×
1992

1993
        if err := l.ackDownStreamPackets(); err != nil {
1,197✔
1994
                return err
×
1995
        }
×
1996

1997
        l.cfg.PendingCommitTicker.Pause()
1,197✔
1998
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
1,197✔
1999

1,197✔
2000
        // The remote party now has a new pending commitment, so we'll update
1,197✔
2001
        // the contract court to be aware of this new set (the prior old remote
1,197✔
2002
        // pending).
1,197✔
2003
        newUpdate := &contractcourt.ContractUpdate{
1,197✔
2004
                HtlcKey: contractcourt.RemotePendingHtlcSet,
1,197✔
2005
                Htlcs:   newCommit.PendingHTLCs,
1,197✔
2006
        }
1,197✔
2007
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,197✔
2008
        if err != nil {
1,197✔
2009
                l.log.Errorf("unable to notify contract update: %v", err)
×
2010
                return err
×
2011
        }
×
2012

2013
        select {
1,197✔
2014
        case <-l.cg.Done():
10✔
2015
                return ErrLinkShuttingDown
10✔
2016
        default:
1,187✔
2017
        }
2018

2019
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
1,187✔
2020
        if err != nil {
1,187✔
2021
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2022
        }
×
2023

2024
        commitSig := &lnwire.CommitSig{
1,187✔
2025
                ChanID:        l.ChanID(),
1,187✔
2026
                CommitSig:     newCommit.CommitSig,
1,187✔
2027
                HtlcSigs:      newCommit.HtlcSigs,
1,187✔
2028
                PartialSig:    newCommit.PartialSig,
1,187✔
2029
                CustomRecords: auxBlobRecords,
1,187✔
2030
        }
1,187✔
2031
        err = l.cfg.Peer.SendMessage(false, commitSig)
1,187✔
2032
        if err != nil {
1,187✔
2033
                l.log.Errorf("failed to send CommitSig: %v", err)
×
2034
        }
×
2035

2036
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
2037
        // of commit hooks.
2038
        l.RWMutex.Lock()
1,187✔
2039
        l.outgoingCommitHooks.invoke()
1,187✔
2040
        l.RWMutex.Unlock()
1,187✔
2041

1,187✔
2042
        return nil
1,187✔
2043
}
2044

2045
// Peer returns the representation of remote peer with which we have the
2046
// channel link opened.
2047
//
2048
// NOTE: Part of the ChannelLink interface.
2049
func (l *channelLink) PeerPubKey() [33]byte {
441✔
2050
        return l.cfg.Peer.PubKey()
441✔
2051
}
441✔
2052

2053
// ChannelPoint returns the channel outpoint for the channel link.
2054
// NOTE: Part of the ChannelLink interface.
2055
func (l *channelLink) ChannelPoint() wire.OutPoint {
852✔
2056
        return l.channel.ChannelPoint()
852✔
2057
}
852✔
2058

2059
// ShortChanID returns the short channel ID for the channel link. The short
2060
// channel ID encodes the exact location in the main chain that the original
2061
// funding output can be found.
2062
//
2063
// NOTE: Part of the ChannelLink interface.
2064
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
4,246✔
2065
        l.RLock()
4,246✔
2066
        defer l.RUnlock()
4,246✔
2067

4,246✔
2068
        return l.channel.ShortChanID()
4,246✔
2069
}
4,246✔
2070

2071
// UpdateShortChanID updates the short channel ID for a link. This may be
2072
// required in the event that a link is created before the short chan ID for it
2073
// is known, or a re-org occurs, and the funding transaction changes location
2074
// within the chain.
2075
//
2076
// NOTE: Part of the ChannelLink interface.
UNCOV
2077
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
×
UNCOV
2078
        chanID := l.ChanID()
×
UNCOV
2079

×
UNCOV
2080
        // Refresh the channel state's short channel ID by loading it from disk.
×
UNCOV
2081
        // This ensures that the channel state accurately reflects the updated
×
UNCOV
2082
        // short channel ID.
×
UNCOV
2083
        err := l.channel.State().Refresh()
×
UNCOV
2084
        if err != nil {
×
2085
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2086
                        "%v", chanID, err)
×
2087
                return hop.Source, err
×
2088
        }
×
2089

UNCOV
2090
        return hop.Source, nil
×
2091
}
2092

2093
// ChanID returns the channel ID for the channel link. The channel ID is a more
2094
// compact representation of a channel's full outpoint.
2095
//
2096
// NOTE: Part of the ChannelLink interface.
2097
func (l *channelLink) ChanID() lnwire.ChannelID {
3,925✔
2098
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
3,925✔
2099
}
3,925✔
2100

2101
// Bandwidth returns the total amount that can flow through the channel link at
2102
// this given instance. The value returned is expressed in millisatoshi and can
2103
// be used by callers when making forwarding decisions to determine if a link
2104
// can accept an HTLC.
2105
//
2106
// NOTE: Part of the ChannelLink interface.
2107
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
811✔
2108
        // Get the balance available on the channel for new HTLCs. This takes
811✔
2109
        // the channel reserve into account so HTLCs up to this value won't
811✔
2110
        // violate it.
811✔
2111
        return l.channel.AvailableBalance()
811✔
2112
}
811✔
2113

2114
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2115
// amount provided to the link. This check does not reserve a space, since
2116
// forwards or other payments may use the available slot, so it should be
2117
// considered best-effort.
UNCOV
2118
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
×
UNCOV
2119
        return l.channel.MayAddOutgoingHtlc(amt)
×
UNCOV
2120
}
×
2121

2122
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2123
// method.
2124
//
2125
// NOTE: Part of the dustHandler interface.
2126
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2127
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
2,523✔
2128

2,523✔
2129
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
2,523✔
2130
}
2,523✔
2131

2132
// getFeeRate is a wrapper method that retrieves the underlying channel's
2133
// feerate.
2134
//
2135
// NOTE: Part of the dustHandler interface.
2136
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
669✔
2137
        return l.channel.CommitFeeRate()
669✔
2138
}
669✔
2139

2140
// getDustClosure returns a closure that can be used by the switch or mailbox
2141
// to evaluate whether a given HTLC is dust.
2142
//
2143
// NOTE: Part of the dustHandler interface.
2144
func (l *channelLink) getDustClosure() dustClosure {
1,599✔
2145
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
1,599✔
2146
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
1,599✔
2147
        chanType := l.channel.State().ChanType
1,599✔
2148

1,599✔
2149
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
1,599✔
2150
}
1,599✔
2151

2152
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2153
// is used so that the Switch can have access to the commitment fee without
2154
// needing to have a *LightningChannel. This doesn't include dust.
2155
//
2156
// NOTE: Part of the dustHandler interface.
2157
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
1,887✔
2158
        if remote {
2,844✔
2159
                return l.channel.State().RemoteCommitment.CommitFee
957✔
2160
        }
957✔
2161

2162
        return l.channel.State().LocalCommitment.CommitFee
930✔
2163
}
2164

2165
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2166
// increases the total dust and fees within the channel past the configured
2167
// fee threshold. It first calculates the dust sum over every update in the
2168
// update log with the proposed fee-rate and taking into account both the local
2169
// and remote dust limits. It uses every update in the update log instead of
2170
// what is actually on the local and remote commitments because it is assumed
2171
// that in a worst-case scenario, every update in the update log could
2172
// theoretically be on either commitment transaction and this needs to be
2173
// accounted for with this fee-rate. It then calculates the local and remote
2174
// commitment fees given the proposed fee-rate. Finally, it tallies the results
2175
// and determines if the fee threshold has been exceeded.
2176
func (l *channelLink) exceedsFeeExposureLimit(
2177
        feePerKw chainfee.SatPerKWeight) (bool, error) {
6✔
2178

6✔
2179
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
6✔
2180

6✔
2181
        // Get the sum of dust for both the local and remote commitments using
6✔
2182
        // this "dry-run" fee.
6✔
2183
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
6✔
2184
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
6✔
2185

6✔
2186
        // Calculate the local and remote commitment fees using this dry-run
6✔
2187
        // fee.
6✔
2188
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
6✔
2189
        if err != nil {
6✔
2190
                return false, err
×
2191
        }
×
2192

2193
        // Finally, check whether the max fee exposure was exceeded on either
2194
        // future commitment transaction with the fee-rate.
2195
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
6✔
2196
        if totalLocalDust > l.cfg.MaxFeeExposure {
6✔
2197
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
2198
                        "local dust: %v, local fee: %v", l.ShortChanID(),
×
2199
                        totalLocalDust, localFee)
×
2200

×
2201
                return true, nil
×
2202
        }
×
2203

2204
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
6✔
2205
                remoteFee,
6✔
2206
        )
6✔
2207

6✔
2208
        if totalRemoteDust > l.cfg.MaxFeeExposure {
6✔
2209
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
2210
                        "remote dust: %v, remote fee: %v", l.ShortChanID(),
×
2211
                        totalRemoteDust, remoteFee)
×
2212

×
2213
                return true, nil
×
2214
        }
×
2215

2216
        return false, nil
6✔
2217
}
2218

2219
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
2220
// channel exceed the fee threshold. It first fetches the largest fee-rate that
2221
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
2222
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
2223
// the overall dust sum. If it is not dust, it contributes to weight, which
2224
// also adds to the overall dust sum by an increase in fees. If the dust sum on
2225
// either commitment exceeds the configured fee threshold, this function
2226
// returns true.
2227
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
2228
        incoming bool) bool {
930✔
2229

930✔
2230
        dustClosure := l.getDustClosure()
930✔
2231

930✔
2232
        feeRate := l.channel.WorstCaseFeeRate()
930✔
2233

930✔
2234
        amount := htlc.Amount.ToSatoshis()
930✔
2235

930✔
2236
        // See if this HTLC is dust on both the local and remote commitments.
930✔
2237
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
930✔
2238
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
930✔
2239

930✔
2240
        // Calculate the dust sum for the local and remote commitments.
930✔
2241
        localDustSum := l.getDustSum(
930✔
2242
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
930✔
2243
        )
930✔
2244
        remoteDustSum := l.getDustSum(
930✔
2245
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
930✔
2246
        )
930✔
2247

930✔
2248
        // Grab the larger of the local and remote commitment fees w/o dust.
930✔
2249
        commitFee := l.getCommitFee(false)
930✔
2250

930✔
2251
        if l.getCommitFee(true) > commitFee {
957✔
2252
                commitFee = l.getCommitFee(true)
27✔
2253
        }
27✔
2254

2255
        commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
930✔
2256

930✔
2257
        localDustSum += commitFeeMSat
930✔
2258
        remoteDustSum += commitFeeMSat
930✔
2259

930✔
2260
        // Calculate the additional fee increase if this is a non-dust HTLC.
930✔
2261
        weight := lntypes.WeightUnit(input.HTLCWeight)
930✔
2262
        additional := lnwire.NewMSatFromSatoshis(
930✔
2263
                feeRate.FeeForWeight(weight),
930✔
2264
        )
930✔
2265

930✔
2266
        if isLocalDust {
1,563✔
2267
                // If this is dust, it doesn't contribute to weight but does
633✔
2268
                // contribute to the overall dust sum.
633✔
2269
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
633✔
2270
        } else {
930✔
2271
                // Account for the fee increase that comes with an increase in
297✔
2272
                // weight.
297✔
2273
                localDustSum += additional
297✔
2274
        }
297✔
2275

2276
        if localDustSum > l.cfg.MaxFeeExposure {
934✔
2277
                // The max fee exposure was exceeded.
4✔
2278
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
4✔
2279
                        "overexposed, total local dust: %v (current commit "+
4✔
2280
                        "fee: %v)", l.ShortChanID(), htlc, localDustSum)
4✔
2281

4✔
2282
                return true
4✔
2283
        }
4✔
2284

2285
        if isRemoteDust {
1,556✔
2286
                // If this is dust, it doesn't contribute to weight but does
630✔
2287
                // contribute to the overall dust sum.
630✔
2288
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
630✔
2289
        } else {
926✔
2290
                // Account for the fee increase that comes with an increase in
296✔
2291
                // weight.
296✔
2292
                remoteDustSum += additional
296✔
2293
        }
296✔
2294

2295
        if remoteDustSum > l.cfg.MaxFeeExposure {
926✔
2296
                // The max fee exposure was exceeded.
×
2297
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
2298
                        "overexposed, total remote dust: %v (current commit "+
×
2299
                        "fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
×
2300

×
2301
                return true
×
2302
        }
×
2303

2304
        return false
926✔
2305
}
2306

2307
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
2308
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
2309
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
2310
// whether to evaluate on the local or remote commit, and finally an HTLC
2311
// amount to test.
2312
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
2313
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
2314

2315
// dustHelper is used to construct the dustClosure.
2316
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
2317
        remoteDustLimit btcutil.Amount) dustClosure {
1,799✔
2318

1,799✔
2319
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
1,799✔
2320
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
11,833✔
2321

10,034✔
2322
                var dustLimit btcutil.Amount
10,034✔
2323
                if whoseCommit.IsLocal() {
15,051✔
2324
                        dustLimit = localDustLimit
5,017✔
2325
                } else {
10,034✔
2326
                        dustLimit = remoteDustLimit
5,017✔
2327
                }
5,017✔
2328

2329
                return lnwallet.HtlcIsDust(
10,034✔
2330
                        chantype, incoming, whoseCommit, feerate, amt,
10,034✔
2331
                        dustLimit,
10,034✔
2332
                )
10,034✔
2333
        }
2334

2335
        return isDust
1,799✔
2336
}
2337

2338
// zeroConfConfirmed returns whether or not the zero-conf channel has
2339
// confirmed on-chain.
2340
//
2341
// Part of the scidAliasHandler interface.
2342
func (l *channelLink) zeroConfConfirmed() bool {
3✔
2343
        return l.channel.State().ZeroConfConfirmed()
3✔
2344
}
3✔
2345

2346
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
2347
// should not be called for non-zero-conf channels.
2348
//
2349
// Part of the scidAliasHandler interface.
2350
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
3✔
2351
        return l.channel.State().ZeroConfRealScid()
3✔
2352
}
3✔
2353

2354
// isZeroConf returns whether or not the underlying channel is a zero-conf
2355
// channel.
2356
//
2357
// Part of the scidAliasHandler interface.
2358
func (l *channelLink) isZeroConf() bool {
213✔
2359
        return l.channel.State().IsZeroConf()
213✔
2360
}
213✔
2361

2362
// negotiatedAliasFeature returns whether or not the underlying channel has
2363
// negotiated the option-scid-alias feature bit. This will be true for both
2364
// option-scid-alias and zero-conf channel-types. It will also be true for
2365
// channels with the feature bit but without the above channel-types.
2366
//
2367
// Part of the scidAliasFeature interface.
2368
func (l *channelLink) negotiatedAliasFeature() bool {
374✔
2369
        return l.channel.State().NegotiatedAliasFeature()
374✔
2370
}
374✔
2371

2372
// getAliases returns the set of aliases for the underlying channel.
2373
//
2374
// Part of the scidAliasHandler interface.
2375
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
219✔
2376
        return l.cfg.GetAliases(l.ShortChanID())
219✔
2377
}
219✔
2378

2379
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
2380
//
2381
// Part of the scidAliasHandler interface.
2382
func (l *channelLink) attachFailAliasUpdate(closure func(
2383
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
214✔
2384

214✔
2385
        l.Lock()
214✔
2386
        l.cfg.FailAliasUpdate = closure
214✔
2387
        l.Unlock()
214✔
2388
}
214✔
2389

2390
// AttachMailBox updates the current mailbox used by this link, and hooks up
2391
// the mailbox's message and packet outboxes to the link's upstream and
2392
// downstream chans, respectively.
2393
func (l *channelLink) AttachMailBox(mailbox MailBox) {
213✔
2394
        l.Lock()
213✔
2395
        l.mailBox = mailbox
213✔
2396
        l.upstream = mailbox.MessageOutBox()
213✔
2397
        l.downstream = mailbox.PacketOutBox()
213✔
2398
        l.Unlock()
213✔
2399

213✔
2400
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
213✔
2401
        // never committed.
213✔
2402
        l.mailBox.SetFeeRate(l.getFeeRate())
213✔
2403

213✔
2404
        // Also set the mailbox's dust closure so that it can query whether HTLC's
213✔
2405
        // are dust given the current feerate.
213✔
2406
        l.mailBox.SetDustClosure(l.getDustClosure())
213✔
2407
}
213✔
2408

2409
// UpdateForwardingPolicy updates the forwarding policy for the target
2410
// ChannelLink. Once updated, the link will use the new forwarding policy to
2411
// govern if it an incoming HTLC should be forwarded or not. We assume that
2412
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
2413
// update all of the link's FwrdingPolicy's values.
2414
//
2415
// NOTE: Part of the ChannelLink interface.
2416
func (l *channelLink) UpdateForwardingPolicy(
2417
        newPolicy models.ForwardingPolicy) {
12✔
2418

12✔
2419
        l.Lock()
12✔
2420
        defer l.Unlock()
12✔
2421

12✔
2422
        l.cfg.FwrdingPolicy = newPolicy
12✔
2423
}
12✔
2424

2425
// CheckHtlcForward should return a nil error if the passed HTLC details
2426
// satisfy the current forwarding policy fo the target link. Otherwise,
2427
// a LinkError with a valid protocol failure message should be returned
2428
// in order to signal to the source of the HTLC, the policy consistency
2429
// issue.
2430
//
2431
// NOTE: Part of the ChannelLink interface.
2432
func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
2433
        amtToForward lnwire.MilliSatoshi, incomingTimeout,
2434
        outgoingTimeout uint32, inboundFee models.InboundFee,
2435
        heightNow uint32, originalScid lnwire.ShortChannelID,
2436
        customRecords lnwire.CustomRecords) *LinkError {
49✔
2437

49✔
2438
        l.RLock()
49✔
2439
        policy := l.cfg.FwrdingPolicy
49✔
2440
        l.RUnlock()
49✔
2441

49✔
2442
        // Using the outgoing HTLC amount, we'll calculate the outgoing
49✔
2443
        // fee this incoming HTLC must carry in order to satisfy the constraints
49✔
2444
        // of the outgoing link.
49✔
2445
        outFee := ExpectedFee(policy, amtToForward)
49✔
2446

49✔
2447
        // Then calculate the inbound fee that we charge based on the sum of
49✔
2448
        // outgoing HTLC amount and outgoing fee.
49✔
2449
        inFee := inboundFee.CalcFee(amtToForward + outFee)
49✔
2450

49✔
2451
        // Add up both fee components. It is important to calculate both fees
49✔
2452
        // separately. An alternative way of calculating is to first determine
49✔
2453
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
49✔
2454
        // rounding may cause the result to be slightly higher than in the case
49✔
2455
        // of separately rounded fee components. This potentially causes failed
49✔
2456
        // forwards for senders and is something to be avoided.
49✔
2457
        expectedFee := inFee + int64(outFee)
49✔
2458

49✔
2459
        // If the actual fee is less than our expected fee, then we'll reject
49✔
2460
        // this HTLC as it didn't provide a sufficient amount of fees, or the
49✔
2461
        // values have been tampered with, or the send used incorrect/dated
49✔
2462
        // information to construct the forwarding information for this hop. In
49✔
2463
        // any case, we'll cancel this HTLC.
49✔
2464
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
49✔
2465
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
55✔
2466
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
6✔
2467
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
6✔
2468
                        "inboundFee=%v",
6✔
2469
                        payHash[:], expectedFee, actualFee,
6✔
2470
                        incomingHtlcAmt, amtToForward, inboundFee,
6✔
2471
                )
6✔
2472

6✔
2473
                // As part of the returned error, we'll send our latest routing
6✔
2474
                // policy so the sending node obtains the most up to date data.
6✔
2475
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
12✔
2476
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
6✔
2477
                }
6✔
2478
                failure := l.createFailureWithUpdate(false, originalScid, cb)
6✔
2479
                return NewLinkError(failure)
6✔
2480
        }
2481

2482
        // Check whether the outgoing htlc satisfies the channel policy.
2483
        err := l.canSendHtlc(
43✔
2484
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
43✔
2485
                originalScid, customRecords,
43✔
2486
        )
43✔
2487
        if err != nil {
56✔
2488
                return err
13✔
2489
        }
13✔
2490

2491
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
2492
        // the following constraint: the incoming time-lock minus our time-lock
2493
        // delta should equal the outgoing time lock. Otherwise, whether the
2494
        // sender messed up, or an intermediate node tampered with the HTLC.
2495
        timeDelta := policy.TimeLockDelta
30✔
2496
        if incomingTimeout < outgoingTimeout+timeDelta {
32✔
2497
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2✔
2498
                        "expected at least %v block delta, got %v block delta",
2✔
2499
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2✔
2500

2✔
2501
                // Grab the latest routing policy so the sending node is up to
2✔
2502
                // date with our current policy.
2✔
2503
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
2504
                        return lnwire.NewIncorrectCltvExpiry(
2✔
2505
                                incomingTimeout, *upd,
2✔
2506
                        )
2✔
2507
                }
2✔
2508
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
2509
                return NewLinkError(failure)
2✔
2510
        }
2511

2512
        return nil
28✔
2513
}
2514

2515
// CheckHtlcTransit should return a nil error if the passed HTLC details
2516
// satisfy the current channel policy.  Otherwise, a LinkError with a
2517
// valid protocol failure message should be returned in order to signal
2518
// the violation. This call is intended to be used for locally initiated
2519
// payments for which there is no corresponding incoming htlc.
2520
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
2521
        amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
2522
        customRecords lnwire.CustomRecords) *LinkError {
406✔
2523

406✔
2524
        l.RLock()
406✔
2525
        policy := l.cfg.FwrdingPolicy
406✔
2526
        l.RUnlock()
406✔
2527

406✔
2528
        // We pass in hop.Source here as this is only used in the Switch when
406✔
2529
        // trying to send over a local link. This causes the fallback mechanism
406✔
2530
        // to occur.
406✔
2531
        return l.canSendHtlc(
406✔
2532
                policy, payHash, amt, timeout, heightNow, hop.Source,
406✔
2533
                customRecords,
406✔
2534
        )
406✔
2535
}
406✔
2536

2537
// canSendHtlc checks whether the given htlc parameters satisfy
2538
// the channel's amount and time lock constraints.
2539
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
2540
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
2541
        heightNow uint32, originalScid lnwire.ShortChannelID,
2542
        customRecords lnwire.CustomRecords) *LinkError {
449✔
2543

449✔
2544
        // As our first sanity check, we'll ensure that the passed HTLC isn't
449✔
2545
        // too small for the next hop. If so, then we'll cancel the HTLC
449✔
2546
        // directly.
449✔
2547
        if amt < policy.MinHTLCOut {
457✔
2548
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
8✔
2549
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
8✔
2550
                        amt)
8✔
2551

8✔
2552
                // As part of the returned error, we'll send our latest routing
8✔
2553
                // policy so the sending node obtains the most up to date data.
8✔
2554
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
16✔
2555
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
8✔
2556
                }
8✔
2557
                failure := l.createFailureWithUpdate(false, originalScid, cb)
8✔
2558
                return NewLinkError(failure)
8✔
2559
        }
2560

2561
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
2562
        // cancel the HTLC directly.
2563
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
444✔
2564
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
3✔
2565
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
3✔
2566

3✔
2567
                // As part of the returned error, we'll send our latest routing
3✔
2568
                // policy so the sending node obtains the most up-to-date data.
3✔
2569
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
6✔
2570
                        return lnwire.NewTemporaryChannelFailure(upd)
3✔
2571
                }
3✔
2572
                failure := l.createFailureWithUpdate(false, originalScid, cb)
3✔
2573
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
3✔
2574
        }
2575

2576
        // We want to avoid offering an HTLC which will expire in the near
2577
        // future, so we'll reject an HTLC if the outgoing expiration time is
2578
        // too close to the current height.
2579
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
440✔
2580
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2✔
2581
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
2✔
2582
                        timeout, heightNow)
2✔
2583

2✔
2584
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
2585
                        return lnwire.NewExpiryTooSoon(*upd)
2✔
2586
                }
2✔
2587
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
2588
                return NewLinkError(failure)
2✔
2589
        }
2590

2591
        // Check absolute max delta.
2592
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
437✔
2593
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
1✔
2594
                        "the future: got %v, but maximum is %v", payHash[:],
1✔
2595
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
1✔
2596

1✔
2597
                return NewLinkError(&lnwire.FailExpiryTooFar{})
1✔
2598
        }
1✔
2599

2600
        // We now check the available bandwidth to see if this HTLC can be
2601
        // forwarded.
2602
        availableBandwidth := l.Bandwidth()
435✔
2603
        auxBandwidth, err := fn.MapOptionZ(
435✔
2604
                l.cfg.AuxTrafficShaper,
435✔
2605
                func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
435✔
2606
                        var htlcBlob fn.Option[tlv.Blob]
×
2607
                        blob, err := customRecords.Serialize()
×
2608
                        if err != nil {
×
2609
                                return fn.Err[OptionalBandwidth](
×
2610
                                        fmt.Errorf("unable to serialize "+
×
2611
                                                "custom records: %w", err))
×
2612
                        }
×
2613

2614
                        if len(blob) > 0 {
×
2615
                                htlcBlob = fn.Some(blob)
×
2616
                        }
×
2617

2618
                        return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
×
2619
                },
2620
        ).Unpack()
2621
        if err != nil {
435✔
2622
                l.log.Errorf("Unable to determine aux bandwidth: %v", err)
×
2623
                return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
×
2624
        }
×
2625

2626
        if auxBandwidth.IsHandled && auxBandwidth.Bandwidth.IsSome() {
435✔
2627
                auxBandwidth.Bandwidth.WhenSome(
×
2628
                        func(bandwidth lnwire.MilliSatoshi) {
×
2629
                                availableBandwidth = bandwidth
×
2630
                        },
×
2631
                )
2632
        }
2633

2634
        // Check to see if there is enough balance in this channel.
2635
        if amt > availableBandwidth {
436✔
2636
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
1✔
2637
                        "larger than %v", amt, availableBandwidth)
1✔
2638
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
2✔
2639
                        return lnwire.NewTemporaryChannelFailure(upd)
1✔
2640
                }
1✔
2641
                failure := l.createFailureWithUpdate(false, originalScid, cb)
1✔
2642
                return NewDetailedLinkError(
1✔
2643
                        failure, OutgoingFailureInsufficientBalance,
1✔
2644
                )
1✔
2645
        }
2646

2647
        return nil
434✔
2648
}
2649

2650
// AuxBandwidth returns the bandwidth that can be used for a channel, expressed
2651
// in milli-satoshi. This might be different from the regular BTC bandwidth for
2652
// custom channels. This will always return fn.None() for a regular (non-custom)
2653
// channel.
2654
func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
2655
        cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
2656
        ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
×
2657

×
2658
        fundingBlob := l.FundingCustomBlob()
×
2659
        shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob, htlcBlob)
×
2660
        if err != nil {
×
2661
                return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
×
2662
                        "failed to decide whether to handle traffic: %w", err))
×
2663
        }
×
2664

2665
        log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
×
2666
                "traffic: %v", cid, shouldHandle)
×
2667

×
2668
        // If this channel isn't handled by the aux traffic shaper, we'll return
×
2669
        // early.
×
2670
        if !shouldHandle {
×
2671
                return fn.Ok(OptionalBandwidth{
×
2672
                        IsHandled: false,
×
2673
                })
×
2674
        }
×
2675

2676
        peerBytes := l.cfg.Peer.PubKey()
×
2677

×
2678
        peer, err := route.NewVertexFromBytes(peerBytes[:])
×
2679
        if err != nil {
×
2680
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to decode "+
×
2681
                        "peer pub key: %v", err))
×
2682
        }
×
2683

2684
        // Ask for a specific bandwidth to be used for the channel.
2685
        commitmentBlob := l.CommitmentCustomBlob()
×
2686
        auxBandwidth, err := ts.PaymentBandwidth(
×
2687
                fundingBlob, htlcBlob, commitmentBlob, l.Bandwidth(), amount,
×
2688
                l.channel.FetchLatestAuxHTLCView(), peer,
×
2689
        )
×
2690
        if err != nil {
×
2691
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
×
2692
                        "bandwidth from external traffic shaper: %w", err))
×
2693
        }
×
2694

2695
        log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
×
2696
                "bandwidth: %v", cid, auxBandwidth)
×
2697

×
2698
        return fn.Ok(OptionalBandwidth{
×
2699
                IsHandled: true,
×
2700
                Bandwidth: fn.Some(auxBandwidth),
×
2701
        })
×
2702
}
2703

2704
// Stats returns the statistics of channel link.
2705
//
2706
// NOTE: Part of the ChannelLink interface.
2707
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
4✔
2708
        snapshot := l.channel.StateSnapshot()
4✔
2709

4✔
2710
        return snapshot.ChannelCommitment.CommitHeight,
4✔
2711
                snapshot.TotalMSatSent,
4✔
2712
                snapshot.TotalMSatReceived
4✔
2713
}
4✔
2714

2715
// String returns the string representation of channel link.
2716
//
2717
// NOTE: Part of the ChannelLink interface.
2718
func (l *channelLink) String() string {
×
2719
        return l.channel.ChannelPoint().String()
×
2720
}
×
2721

2722
// handleSwitchPacket handles the switch packets. This packets which might be
2723
// forwarded to us from another channel link in case the htlc update came from
2724
// another peer or if the update was created by user
2725
//
2726
// NOTE: Part of the packetHandler interface.
2727
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
479✔
2728
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
479✔
2729
                pkt.inKey(), pkt.outKey())
479✔
2730

479✔
2731
        return l.mailBox.AddPacket(pkt)
479✔
2732
}
479✔
2733

2734
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
2735
// to us from remote peer we have a channel with.
2736
//
2737
// NOTE: Part of the ChannelLink interface.
2738
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
3,341✔
2739
        select {
3,341✔
2740
        case <-l.cg.Done():
×
2741
                // Return early if the link is already in the process of
×
2742
                // quitting. It doesn't make sense to hand the message to the
×
2743
                // mailbox here.
×
2744
                return
×
2745
        default:
3,341✔
2746
        }
2747

2748
        err := l.mailBox.AddMessage(message)
3,341✔
2749
        if err != nil {
3,341✔
2750
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
2751
        }
×
2752
}
2753

2754
// updateChannelFee updates the commitment fee-per-kw on this channel by
2755
// committing to an update_fee message.
2756
func (l *channelLink) updateChannelFee(ctx context.Context,
2757
        feePerKw chainfee.SatPerKWeight) error {
3✔
2758

3✔
2759
        l.log.Infof("updating commit fee to %v", feePerKw)
3✔
2760

3✔
2761
        // We skip sending the UpdateFee message if the channel is not
3✔
2762
        // currently eligible to forward messages.
3✔
2763
        if !l.eligibleToUpdate() {
3✔
2764
                l.log.Debugf("skipping fee update for inactive channel")
×
2765
                return nil
×
2766
        }
×
2767

2768
        // Check and see if our proposed fee-rate would make us exceed the fee
2769
        // threshold.
2770
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
3✔
2771
        if err != nil {
3✔
2772
                // This shouldn't typically happen. If it does, it indicates
×
2773
                // something is wrong with our channel state.
×
2774
                return err
×
2775
        }
×
2776

2777
        if thresholdExceeded {
3✔
2778
                return fmt.Errorf("link fee threshold exceeded")
×
2779
        }
×
2780

2781
        // First, we'll update the local fee on our commitment.
2782
        if err := l.channel.UpdateFee(feePerKw); err != nil {
3✔
2783
                return err
×
2784
        }
×
2785

2786
        // The fee passed the channel's validation checks, so we update the
2787
        // mailbox feerate.
2788
        l.mailBox.SetFeeRate(feePerKw)
3✔
2789

3✔
2790
        // We'll then attempt to send a new UpdateFee message, and also lock it
3✔
2791
        // in immediately by triggering a commitment update.
3✔
2792
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
3✔
2793
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
2794
                return err
×
2795
        }
×
2796

2797
        return l.updateCommitTx(ctx)
3✔
2798
}
2799

2800
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
2801
// after receiving a revocation from the remote party, and reprocesses them in
2802
// the context of the provided forwarding package. Any settles or fails that
2803
// have already been acknowledged in the forwarding package will not be sent to
2804
// the switch.
2805
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
1,175✔
2806
        if len(fwdPkg.SettleFails) == 0 {
2,036✔
2807
                l.log.Trace("fwd package has no settle/fails to process " +
861✔
2808
                        "exiting early")
861✔
2809

861✔
2810
                return
861✔
2811
        }
861✔
2812

2813
        // Exit early if the fwdPkg is already processed.
2814
        if fwdPkg.State == channeldb.FwdStateCompleted {
314✔
2815
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
2816

×
2817
                return
×
2818
        }
×
2819

2820
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
314✔
2821

314✔
2822
        var switchPackets []*htlcPacket
314✔
2823
        for i, update := range fwdPkg.SettleFails {
628✔
2824
                destRef := fwdPkg.DestRef(uint16(i))
314✔
2825

314✔
2826
                // Skip any settles or fails that have already been
314✔
2827
                // acknowledged by the incoming link that originated the
314✔
2828
                // forwarded Add.
314✔
2829
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
314✔
2830
                        continue
×
2831
                }
2832

2833
                // TODO(roasbeef): rework log entries to a shared
2834
                // interface.
2835

2836
                switch msg := update.UpdateMsg.(type) {
314✔
2837
                // A settle for an HTLC we previously forwarded HTLC has been
2838
                // received. So we'll forward the HTLC to the switch which will
2839
                // handle propagating the settle to the prior hop.
2840
                case *lnwire.UpdateFulfillHTLC:
191✔
2841
                        // If hodl.SettleIncoming is requested, we will not
191✔
2842
                        // forward the SETTLE to the switch and will not signal
191✔
2843
                        // a free slot on the commitment transaction.
191✔
2844
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
191✔
2845
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
2846
                                continue
×
2847
                        }
2848

2849
                        settlePacket := &htlcPacket{
191✔
2850
                                outgoingChanID: l.ShortChanID(),
191✔
2851
                                outgoingHTLCID: msg.ID,
191✔
2852
                                destRef:        &destRef,
191✔
2853
                                htlc:           msg,
191✔
2854
                        }
191✔
2855

191✔
2856
                        // Add the packet to the batch to be forwarded, and
191✔
2857
                        // notify the overflow queue that a spare spot has been
191✔
2858
                        // freed up within the commitment state.
191✔
2859
                        switchPackets = append(switchPackets, settlePacket)
191✔
2860

2861
                // A failureCode message for a previously forwarded HTLC has
2862
                // been received. As a result a new slot will be freed up in
2863
                // our commitment state, so we'll forward this to the switch so
2864
                // the backwards undo can continue.
2865
                case *lnwire.UpdateFailHTLC:
123✔
2866
                        // If hodl.SettleIncoming is requested, we will not
123✔
2867
                        // forward the FAIL to the switch and will not signal a
123✔
2868
                        // free slot on the commitment transaction.
123✔
2869
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
123✔
2870
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
2871
                                continue
×
2872
                        }
2873

2874
                        // Fetch the reason the HTLC was canceled so we can
2875
                        // continue to propagate it. This failure originated
2876
                        // from another node, so the linkFailure field is not
2877
                        // set on the packet.
2878
                        failPacket := &htlcPacket{
123✔
2879
                                outgoingChanID: l.ShortChanID(),
123✔
2880
                                outgoingHTLCID: msg.ID,
123✔
2881
                                destRef:        &destRef,
123✔
2882
                                htlc:           msg,
123✔
2883
                        }
123✔
2884

123✔
2885
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
123✔
2886

123✔
2887
                        // If the failure message lacks an HMAC (but includes
123✔
2888
                        // the 4 bytes for encoding the message and padding
123✔
2889
                        // lengths, then this means that we received it as an
123✔
2890
                        // UpdateFailMalformedHTLC. As a result, we'll signal
123✔
2891
                        // that we need to convert this error within the switch
123✔
2892
                        // to an actual error, by encrypting it as if we were
123✔
2893
                        // the originating hop.
123✔
2894
                        convertedErrorSize := lnwire.FailureMessageLength + 4
123✔
2895
                        if len(msg.Reason) == convertedErrorSize {
126✔
2896
                                failPacket.convertedError = true
3✔
2897
                        }
3✔
2898

2899
                        // Add the packet to the batch to be forwarded, and
2900
                        // notify the overflow queue that a spare spot has been
2901
                        // freed up within the commitment state.
2902
                        switchPackets = append(switchPackets, failPacket)
123✔
2903
                }
2904
        }
2905

2906
        // Only spawn the task forward packets we have a non-zero number.
2907
        if len(switchPackets) > 0 {
628✔
2908
                go l.forwardBatch(false, switchPackets...)
314✔
2909
        }
314✔
2910
}
2911

2912
// processRemoteAdds serially processes each of the Add payment descriptors
2913
// which have been "locked-in" by receiving a revocation from the remote party.
2914
// The forwarding package provided instructs how to process this batch,
2915
// indicating whether this is the first time these Adds are being processed, or
2916
// whether we are reprocessing as a result of a failure or restart. Adds that
2917
// have already been acknowledged in the forwarding package will be ignored.
2918
//
2919
// NOTE: This function needs also be called for fwd packages with no ADDs
2920
// because it marks the fwdPkg as processed by writing the FwdFilter into the
2921
// database.
2922
//
2923
//nolint:funlen
2924
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
1,177✔
2925
        // Exit early if the fwdPkg is already processed.
1,177✔
2926
        if fwdPkg.State == channeldb.FwdStateCompleted {
1,177✔
2927
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
2928

×
2929
                return
×
2930
        }
×
2931

2932
        l.log.Tracef("processing %d remote adds for height %d",
1,177✔
2933
                len(fwdPkg.Adds), fwdPkg.Height)
1,177✔
2934

1,177✔
2935
        // decodeReqs is a list of requests sent to the onion decoder. We expect
1,177✔
2936
        // the same length of responses to be returned.
1,177✔
2937
        decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds))
1,177✔
2938

1,177✔
2939
        // unackedAdds is a list of ADDs that's waiting for the remote's
1,177✔
2940
        // settle/fail update.
1,177✔
2941
        unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds))
1,177✔
2942

1,177✔
2943
        for i, update := range fwdPkg.Adds {
1,626✔
2944
                // If this index is already found in the ack filter, the
449✔
2945
                // response to this forwarding decision has already been
449✔
2946
                // committed by one of our commitment txns. ADDs in this state
449✔
2947
                // are waiting for the rest of the fwding package to get acked
449✔
2948
                // before being garbage collected.
449✔
2949
                if fwdPkg.State == channeldb.FwdStateProcessed &&
449✔
2950
                        fwdPkg.AckFilter.Contains(uint16(i)) {
449✔
2951

×
2952
                        continue
×
2953
                }
2954

2955
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
898✔
2956
                        // Before adding the new htlc to the state machine,
449✔
2957
                        // parse the onion object in order to obtain the
449✔
2958
                        // routing information with DecodeHopIterator function
449✔
2959
                        // which process the Sphinx packet.
449✔
2960
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
449✔
2961

449✔
2962
                        req := hop.DecodeHopIteratorRequest{
449✔
2963
                                OnionReader:    onionReader,
449✔
2964
                                RHash:          msg.PaymentHash[:],
449✔
2965
                                IncomingCltv:   msg.Expiry,
449✔
2966
                                IncomingAmount: msg.Amount,
449✔
2967
                                BlindingPoint:  msg.BlindingPoint,
449✔
2968
                        }
449✔
2969

449✔
2970
                        decodeReqs = append(decodeReqs, req)
449✔
2971
                        unackedAdds = append(unackedAdds, msg)
449✔
2972
                }
449✔
2973
        }
2974

2975
        // If the fwdPkg has already been processed, it means we are
2976
        // reforwarding the packets again, which happens only on a restart.
2977
        reforward := fwdPkg.State == channeldb.FwdStateProcessed
1,177✔
2978

1,177✔
2979
        // Atomically decode the incoming htlcs, simultaneously checking for
1,177✔
2980
        // replay attempts. A particular index in the returned, spare list of
1,177✔
2981
        // channel iterators should only be used if the failure code at the
1,177✔
2982
        // same index is lnwire.FailCodeNone.
1,177✔
2983
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
1,177✔
2984
                fwdPkg.ID(), decodeReqs, reforward,
1,177✔
2985
        )
1,177✔
2986
        if sphinxErr != nil {
1,177✔
2987
                l.failf(LinkFailureError{code: ErrInternalError},
×
2988
                        "unable to decode hop iterators: %v", sphinxErr)
×
2989
                return
×
2990
        }
×
2991

2992
        var switchPackets []*htlcPacket
1,177✔
2993

1,177✔
2994
        for i, update := range unackedAdds {
1,626✔
2995
                idx := uint16(i)
449✔
2996
                sourceRef := fwdPkg.SourceRef(idx)
449✔
2997
                add := *update
449✔
2998

449✔
2999
                // An incoming HTLC add has been full-locked in. As a result we
449✔
3000
                // can now examine the forwarding details of the HTLC, and the
449✔
3001
                // HTLC itself to decide if: we should forward it, cancel it,
449✔
3002
                // or are able to settle it (and it adheres to our fee related
449✔
3003
                // constraints).
449✔
3004

449✔
3005
                // Before adding the new htlc to the state machine, parse the
449✔
3006
                // onion object in order to obtain the routing information with
449✔
3007
                // DecodeHopIterator function which process the Sphinx packet.
449✔
3008
                chanIterator, failureCode := decodeResps[i].Result()
449✔
3009
                if failureCode != lnwire.CodeNone {
451✔
3010
                        // If we're unable to process the onion blob then we
2✔
3011
                        // should send the malformed htlc error to payment
2✔
3012
                        // sender.
2✔
3013
                        l.sendMalformedHTLCError(
2✔
3014
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
2✔
3015
                        )
2✔
3016

2✔
3017
                        l.log.Errorf("unable to decode onion hop iterator "+
2✔
3018
                                "for htlc(id=%v, hash=%x): %v", add.ID,
2✔
3019
                                add.PaymentHash, failureCode)
2✔
3020

2✔
3021
                        continue
2✔
3022
                }
3023

3024
                heightNow := l.cfg.BestHeight()
447✔
3025

447✔
3026
                pld, routeRole, pldErr := chanIterator.HopPayload()
447✔
3027
                if pldErr != nil {
447✔
UNCOV
3028
                        // If we're unable to process the onion payload, or we
×
UNCOV
3029
                        // received invalid onion payload failure, then we
×
UNCOV
3030
                        // should send an error back to the caller so the HTLC
×
UNCOV
3031
                        // can be canceled.
×
UNCOV
3032
                        var failedType uint64
×
UNCOV
3033

×
UNCOV
3034
                        // We need to get the underlying error value, so we
×
UNCOV
3035
                        // can't use errors.As as suggested by the linter.
×
UNCOV
3036
                        //nolint:errorlint
×
UNCOV
3037
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
×
3038
                                failedType = uint64(e.Type)
×
3039
                        }
×
3040

3041
                        // If we couldn't parse the payload, make our best
3042
                        // effort at creating an error encrypter that knows
3043
                        // what blinding type we were, but if we couldn't
3044
                        // parse the payload we have no way of knowing whether
3045
                        // we were the introduction node or not.
3046
                        //
3047
                        //nolint:ll
UNCOV
3048
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
×
UNCOV
3049
                                l.cfg.ExtractErrorEncrypter,
×
UNCOV
3050
                                // We need our route role here because we
×
UNCOV
3051
                                // couldn't parse or validate the payload.
×
UNCOV
3052
                                routeRole == hop.RouteRoleIntroduction,
×
UNCOV
3053
                        )
×
UNCOV
3054
                        if failCode != lnwire.CodeNone {
×
3055
                                l.log.Errorf("could not extract error "+
×
3056
                                        "encrypter: %v", pldErr)
×
3057

×
3058
                                // We can't process this htlc, send back
×
3059
                                // malformed.
×
3060
                                l.sendMalformedHTLCError(
×
3061
                                        add.ID, failureCode, add.OnionBlob,
×
3062
                                        &sourceRef,
×
3063
                                )
×
3064

×
3065
                                continue
×
3066
                        }
3067

3068
                        // TODO: currently none of the test unit infrastructure
3069
                        // is setup to handle TLV payloads, so testing this
3070
                        // would require implementing a separate mock iterator
3071
                        // for TLV payloads that also supports injecting invalid
3072
                        // payloads. Deferring this non-trival effort till a
3073
                        // later date
UNCOV
3074
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
×
UNCOV
3075

×
UNCOV
3076
                        l.sendHTLCError(
×
UNCOV
3077
                                add, sourceRef, NewLinkError(failure),
×
UNCOV
3078
                                obfuscator, false,
×
UNCOV
3079
                        )
×
UNCOV
3080

×
UNCOV
3081
                        l.log.Errorf("unable to decode forwarding "+
×
UNCOV
3082
                                "instructions: %v", pldErr)
×
UNCOV
3083

×
UNCOV
3084
                        continue
×
3085
                }
3086

3087
                // Retrieve onion obfuscator from onion blob in order to
3088
                // produce initial obfuscation of the onion failureCode.
3089
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
447✔
3090
                        l.cfg.ExtractErrorEncrypter,
447✔
3091
                        routeRole == hop.RouteRoleIntroduction,
447✔
3092
                )
447✔
3093
                if failureCode != lnwire.CodeNone {
448✔
3094
                        // If we're unable to process the onion blob than we
1✔
3095
                        // should send the malformed htlc error to payment
1✔
3096
                        // sender.
1✔
3097
                        l.sendMalformedHTLCError(
1✔
3098
                                add.ID, failureCode, add.OnionBlob,
1✔
3099
                                &sourceRef,
1✔
3100
                        )
1✔
3101

1✔
3102
                        l.log.Errorf("unable to decode onion "+
1✔
3103
                                "obfuscator: %v", failureCode)
1✔
3104

1✔
3105
                        continue
1✔
3106
                }
3107

3108
                fwdInfo := pld.ForwardingInfo()
446✔
3109

446✔
3110
                // Check whether the payload we've just processed uses our
446✔
3111
                // node as the introduction point (gave us a blinding key in
446✔
3112
                // the payload itself) and fail it back if we don't support
446✔
3113
                // route blinding.
446✔
3114
                if fwdInfo.NextBlinding.IsSome() &&
446✔
3115
                        l.cfg.DisallowRouteBlinding {
446✔
UNCOV
3116

×
UNCOV
3117
                        failure := lnwire.NewInvalidBlinding(
×
UNCOV
3118
                                fn.Some(add.OnionBlob),
×
UNCOV
3119
                        )
×
UNCOV
3120

×
UNCOV
3121
                        l.sendHTLCError(
×
UNCOV
3122
                                add, sourceRef, NewLinkError(failure),
×
UNCOV
3123
                                obfuscator, false,
×
UNCOV
3124
                        )
×
UNCOV
3125

×
UNCOV
3126
                        l.log.Error("rejected htlc that uses use as an " +
×
UNCOV
3127
                                "introduction point when we do not support " +
×
UNCOV
3128
                                "route blinding")
×
UNCOV
3129

×
UNCOV
3130
                        continue
×
3131
                }
3132

3133
                switch fwdInfo.NextHop {
446✔
3134
                case hop.Exit:
410✔
3135
                        err := l.processExitHop(
410✔
3136
                                add, sourceRef, obfuscator, fwdInfo,
410✔
3137
                                heightNow, pld,
410✔
3138
                        )
410✔
3139
                        if err != nil {
410✔
3140
                                l.failf(LinkFailureError{
×
3141
                                        code: ErrInternalError,
×
3142
                                }, err.Error()) //nolint
×
3143

×
3144
                                return
×
3145
                        }
×
3146

3147
                // There are additional channels left within this route. So
3148
                // we'll simply do some forwarding package book-keeping.
3149
                default:
36✔
3150
                        // If hodl.AddIncoming is requested, we will not
36✔
3151
                        // validate the forwarded ADD, nor will we send the
36✔
3152
                        // packet to the htlc switch.
36✔
3153
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
36✔
3154
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3155
                                continue
×
3156
                        }
3157

3158
                        endorseValue := l.experimentalEndorsement(
36✔
3159
                                record.CustomSet(add.CustomRecords),
36✔
3160
                        )
36✔
3161
                        endorseType := uint64(
36✔
3162
                                lnwire.ExperimentalEndorsementType,
36✔
3163
                        )
36✔
3164

36✔
3165
                        switch fwdPkg.State {
36✔
UNCOV
3166
                        case channeldb.FwdStateProcessed:
×
UNCOV
3167
                                // This add was not forwarded on the previous
×
UNCOV
3168
                                // processing phase, run it through our
×
UNCOV
3169
                                // validation pipeline to reproduce an error.
×
UNCOV
3170
                                // This may trigger a different error due to
×
UNCOV
3171
                                // expiring timelocks, but we expect that an
×
UNCOV
3172
                                // error will be reproduced.
×
UNCOV
3173
                                if !fwdPkg.FwdFilter.Contains(idx) {
×
3174
                                        break
×
3175
                                }
3176

3177
                                // Otherwise, it was already processed, we can
3178
                                // can collect it and continue.
UNCOV
3179
                                outgoingAdd := &lnwire.UpdateAddHTLC{
×
UNCOV
3180
                                        Expiry:        fwdInfo.OutgoingCTLV,
×
UNCOV
3181
                                        Amount:        fwdInfo.AmountToForward,
×
UNCOV
3182
                                        PaymentHash:   add.PaymentHash,
×
UNCOV
3183
                                        BlindingPoint: fwdInfo.NextBlinding,
×
UNCOV
3184
                                }
×
UNCOV
3185

×
UNCOV
3186
                                endorseValue.WhenSome(func(e byte) {
×
UNCOV
3187
                                        custRecords := map[uint64][]byte{
×
UNCOV
3188
                                                endorseType: {e},
×
UNCOV
3189
                                        }
×
UNCOV
3190

×
UNCOV
3191
                                        outgoingAdd.CustomRecords = custRecords
×
UNCOV
3192
                                })
×
3193

3194
                                // Finally, we'll encode the onion packet for
3195
                                // the _next_ hop using the hop iterator
3196
                                // decoded for the current hop.
UNCOV
3197
                                buf := bytes.NewBuffer(
×
UNCOV
3198
                                        outgoingAdd.OnionBlob[0:0],
×
UNCOV
3199
                                )
×
UNCOV
3200

×
UNCOV
3201
                                // We know this cannot fail, as this ADD
×
UNCOV
3202
                                // was marked forwarded in a previous
×
UNCOV
3203
                                // round of processing.
×
UNCOV
3204
                                chanIterator.EncodeNextHop(buf)
×
UNCOV
3205

×
UNCOV
3206
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
×
UNCOV
3207

×
UNCOV
3208
                                //nolint:ll
×
UNCOV
3209
                                updatePacket := &htlcPacket{
×
UNCOV
3210
                                        incomingChanID:       l.ShortChanID(),
×
UNCOV
3211
                                        incomingHTLCID:       add.ID,
×
UNCOV
3212
                                        outgoingChanID:       fwdInfo.NextHop,
×
UNCOV
3213
                                        sourceRef:            &sourceRef,
×
UNCOV
3214
                                        incomingAmount:       add.Amount,
×
UNCOV
3215
                                        amount:               outgoingAdd.Amount,
×
UNCOV
3216
                                        htlc:                 outgoingAdd,
×
UNCOV
3217
                                        obfuscator:           obfuscator,
×
UNCOV
3218
                                        incomingTimeout:      add.Expiry,
×
UNCOV
3219
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
×
UNCOV
3220
                                        inOnionCustomRecords: pld.CustomRecords(),
×
UNCOV
3221
                                        inboundFee:           inboundFee,
×
UNCOV
3222
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
×
UNCOV
3223
                                }
×
UNCOV
3224
                                switchPackets = append(
×
UNCOV
3225
                                        switchPackets, updatePacket,
×
UNCOV
3226
                                )
×
UNCOV
3227

×
UNCOV
3228
                                continue
×
3229
                        }
3230

3231
                        // TODO(roasbeef): ensure don't accept outrageous
3232
                        // timeout for htlc
3233

3234
                        // With all our forwarding constraints met, we'll
3235
                        // create the outgoing HTLC using the parameters as
3236
                        // specified in the forwarding info.
3237
                        addMsg := &lnwire.UpdateAddHTLC{
36✔
3238
                                Expiry:        fwdInfo.OutgoingCTLV,
36✔
3239
                                Amount:        fwdInfo.AmountToForward,
36✔
3240
                                PaymentHash:   add.PaymentHash,
36✔
3241
                                BlindingPoint: fwdInfo.NextBlinding,
36✔
3242
                        }
36✔
3243

36✔
3244
                        endorseValue.WhenSome(func(e byte) {
72✔
3245
                                addMsg.CustomRecords = map[uint64][]byte{
36✔
3246
                                        endorseType: {e},
36✔
3247
                                }
36✔
3248
                        })
36✔
3249

3250
                        // Finally, we'll encode the onion packet for the
3251
                        // _next_ hop using the hop iterator decoded for the
3252
                        // current hop.
3253
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
36✔
3254
                        err := chanIterator.EncodeNextHop(buf)
36✔
3255
                        if err != nil {
36✔
3256
                                l.log.Errorf("unable to encode the "+
×
3257
                                        "remaining route %v", err)
×
3258

×
3259
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
×
3260
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
3261
                                }
×
3262

3263
                                failure := l.createFailureWithUpdate(
×
3264
                                        true, hop.Source, cb,
×
3265
                                )
×
3266

×
3267
                                l.sendHTLCError(
×
3268
                                        add, sourceRef, NewLinkError(failure),
×
3269
                                        obfuscator, false,
×
3270
                                )
×
3271
                                continue
×
3272
                        }
3273

3274
                        // Now that this add has been reprocessed, only append
3275
                        // it to our list of packets to forward to the switch
3276
                        // this is the first time processing the add. If the
3277
                        // fwd pkg has already been processed, then we entered
3278
                        // the above section to recreate a previous error.  If
3279
                        // the packet had previously been forwarded, it would
3280
                        // have been added to switchPackets at the top of this
3281
                        // section.
3282
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
72✔
3283
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
36✔
3284

36✔
3285
                                //nolint:ll
36✔
3286
                                updatePacket := &htlcPacket{
36✔
3287
                                        incomingChanID:       l.ShortChanID(),
36✔
3288
                                        incomingHTLCID:       add.ID,
36✔
3289
                                        outgoingChanID:       fwdInfo.NextHop,
36✔
3290
                                        sourceRef:            &sourceRef,
36✔
3291
                                        incomingAmount:       add.Amount,
36✔
3292
                                        amount:               addMsg.Amount,
36✔
3293
                                        htlc:                 addMsg,
36✔
3294
                                        obfuscator:           obfuscator,
36✔
3295
                                        incomingTimeout:      add.Expiry,
36✔
3296
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
36✔
3297
                                        inOnionCustomRecords: pld.CustomRecords(),
36✔
3298
                                        inboundFee:           inboundFee,
36✔
3299
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
36✔
3300
                                }
36✔
3301

36✔
3302
                                fwdPkg.FwdFilter.Set(idx)
36✔
3303
                                switchPackets = append(switchPackets,
36✔
3304
                                        updatePacket)
36✔
3305
                        }
36✔
3306
                }
3307
        }
3308

3309
        // Commit the htlcs we are intending to forward if this package has not
3310
        // been fully processed.
3311
        if fwdPkg.State == channeldb.FwdStateLockedIn {
2,351✔
3312
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
1,174✔
3313
                if err != nil {
1,174✔
3314
                        l.failf(LinkFailureError{code: ErrInternalError},
×
3315
                                "unable to set fwd filter: %v", err)
×
3316
                        return
×
3317
                }
×
3318
        }
3319

3320
        if len(switchPackets) == 0 {
2,318✔
3321
                return
1,141✔
3322
        }
1,141✔
3323

3324
        l.log.Debugf("forwarding %d packets to switch: reforward=%v",
36✔
3325
                len(switchPackets), reforward)
36✔
3326

36✔
3327
        // NOTE: This call is made synchronous so that we ensure all circuits
36✔
3328
        // are committed in the exact order that they are processed in the link.
36✔
3329
        // Failing to do this could cause reorderings/gaps in the range of
36✔
3330
        // opened circuits, which violates assumptions made by the circuit
36✔
3331
        // trimming.
36✔
3332
        l.forwardBatch(reforward, switchPackets...)
36✔
3333
}
3334

3335
// experimentalEndorsement returns the value to set for our outgoing
3336
// experimental endorsement field, and a boolean indicating whether it should
3337
// be populated on the outgoing htlc.
3338
func (l *channelLink) experimentalEndorsement(
3339
        customUpdateAdd record.CustomSet) fn.Option[byte] {
36✔
3340

36✔
3341
        // Only relay experimental signal if we are within the experiment
36✔
3342
        // period.
36✔
3343
        if !l.cfg.ShouldFwdExpEndorsement() {
36✔
UNCOV
3344
                return fn.None[byte]()
×
UNCOV
3345
        }
×
3346

3347
        // If we don't have any custom records or the experimental field is
3348
        // not set, just forward a zero value.
3349
        if len(customUpdateAdd) == 0 {
72✔
3350
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
36✔
3351
        }
36✔
3352

UNCOV
3353
        t := uint64(lnwire.ExperimentalEndorsementType)
×
UNCOV
3354
        value, set := customUpdateAdd[t]
×
UNCOV
3355
        if !set {
×
3356
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
3357
        }
×
3358

3359
        // We expect at least one byte for this field, consider it invalid if
3360
        // it has no data and just forward a zero value.
UNCOV
3361
        if len(value) == 0 {
×
3362
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
3363
        }
×
3364

3365
        // Only forward endorsed if the incoming link is endorsed.
UNCOV
3366
        if value[0] == lnwire.ExperimentalEndorsed {
×
UNCOV
3367
                return fn.Some[byte](lnwire.ExperimentalEndorsed)
×
UNCOV
3368
        }
×
3369

3370
        // Forward as unendorsed otherwise, including cases where we've
3371
        // received an invalid value that uses more than 3 bits of information.
UNCOV
3372
        return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
3373
}
3374

3375
// processExitHop handles an htlc for which this link is the exit hop. It
3376
// returns a boolean indicating whether the commitment tx needs an update.
3377
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
3378
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
3379
        fwdInfo hop.ForwardingInfo, heightNow uint32,
3380
        payload invoices.Payload) error {
410✔
3381

410✔
3382
        // If hodl.ExitSettle is requested, we will not validate the final hop's
410✔
3383
        // ADD, nor will we settle the corresponding invoice or respond with the
410✔
3384
        // preimage.
410✔
3385
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
517✔
3386
                l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
107✔
3387
                        hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
107✔
3388

107✔
3389
                return nil
107✔
3390
        }
107✔
3391

3392
        // In case the traffic shaper is active, we'll check if the HTLC has
3393
        // custom records and skip the amount check in the onion payload below.
3394
        isCustomHTLC := fn.MapOptionZ(
303✔
3395
                l.cfg.AuxTrafficShaper,
303✔
3396
                func(ts AuxTrafficShaper) bool {
303✔
3397
                        return ts.IsCustomHTLC(add.CustomRecords)
×
3398
                },
×
3399
        )
3400

3401
        // As we're the exit hop, we'll double check the hop-payload included in
3402
        // the HTLC to ensure that it was crafted correctly by the sender and
3403
        // is compatible with the HTLC we were extended. If an external
3404
        // validator is active we might bypass the amount check.
3405
        if !isCustomHTLC && add.Amount < fwdInfo.AmountToForward {
403✔
3406
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
100✔
3407
                        "incompatible value: expected <=%v, got %v",
100✔
3408
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
100✔
3409

100✔
3410
                failure := NewLinkError(
100✔
3411
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
100✔
3412
                )
100✔
3413
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
100✔
3414

100✔
3415
                return nil
100✔
3416
        }
100✔
3417

3418
        // We'll also ensure that our time-lock value has been computed
3419
        // correctly.
3420
        if add.Expiry < fwdInfo.OutgoingCTLV {
204✔
3421
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
1✔
3422
                        "incompatible time-lock: expected <=%v, got %v",
1✔
3423
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
1✔
3424

1✔
3425
                failure := NewLinkError(
1✔
3426
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
1✔
3427
                )
1✔
3428

1✔
3429
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
1✔
3430

1✔
3431
                return nil
1✔
3432
        }
1✔
3433

3434
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
3435
        // after this, this code will be re-executed after restart. We will
3436
        // receive back a resolution event.
3437
        invoiceHash := lntypes.Hash(add.PaymentHash)
202✔
3438

202✔
3439
        circuitKey := models.CircuitKey{
202✔
3440
                ChanID: l.ShortChanID(),
202✔
3441
                HtlcID: add.ID,
202✔
3442
        }
202✔
3443

202✔
3444
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
202✔
3445
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
202✔
3446
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
202✔
3447
        )
202✔
3448
        if err != nil {
202✔
3449
                return err
×
3450
        }
×
3451

3452
        // Create a hodlHtlc struct and decide either resolved now or later.
3453
        htlc := hodlHtlc{
202✔
3454
                add:        add,
202✔
3455
                sourceRef:  sourceRef,
202✔
3456
                obfuscator: obfuscator,
202✔
3457
        }
202✔
3458

202✔
3459
        // If the event is nil, the invoice is being held, so we save payment
202✔
3460
        // descriptor for future reference.
202✔
3461
        if event == nil {
258✔
3462
                l.hodlMap[circuitKey] = htlc
56✔
3463
                return nil
56✔
3464
        }
56✔
3465

3466
        // Process the received resolution.
3467
        return l.processHtlcResolution(event, htlc)
146✔
3468
}
3469

3470
// settleHTLC settles the HTLC on the channel.
3471
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
3472
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
197✔
3473

197✔
3474
        hash := preimage.Hash()
197✔
3475

197✔
3476
        l.log.Infof("settling htlc %v as exit hop", hash)
197✔
3477

197✔
3478
        err := l.channel.SettleHTLC(
197✔
3479
                preimage, htlcIndex, &sourceRef, nil, nil,
197✔
3480
        )
197✔
3481
        if err != nil {
197✔
3482
                return fmt.Errorf("unable to settle htlc: %w", err)
×
3483
        }
×
3484

3485
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
3486
        // fake one before sending it to the peer.
3487
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
197✔
UNCOV
3488
                l.log.Warnf(hodl.BogusSettle.Warning())
×
UNCOV
3489
                preimage = [32]byte{}
×
UNCOV
3490
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
×
UNCOV
3491
        }
×
3492

3493
        // HTLC was successfully settled locally send notification about it
3494
        // remote peer.
3495
        err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
197✔
3496
                ChanID:          l.ChanID(),
197✔
3497
                ID:              htlcIndex,
197✔
3498
                PaymentPreimage: preimage,
197✔
3499
        })
197✔
3500
        if err != nil {
197✔
3501
                l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
×
3502
        }
×
3503

3504
        // Once we have successfully settled the htlc, notify a settle event.
3505
        l.cfg.HtlcNotifier.NotifySettleEvent(
197✔
3506
                HtlcKey{
197✔
3507
                        IncomingCircuit: models.CircuitKey{
197✔
3508
                                ChanID: l.ShortChanID(),
197✔
3509
                                HtlcID: htlcIndex,
197✔
3510
                        },
197✔
3511
                },
197✔
3512
                preimage,
197✔
3513
                HtlcEventTypeReceive,
197✔
3514
        )
197✔
3515

197✔
3516
        return nil
197✔
3517
}
3518

3519
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
3520
// err chan for the individual responses. This method is intended to be spawned
3521
// as a goroutine so the responses can be handled in the background.
3522
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
575✔
3523
        // Don't forward packets for which we already have a response in our
575✔
3524
        // mailbox. This could happen if a packet fails and is buffered in the
575✔
3525
        // mailbox, and the incoming link flaps.
575✔
3526
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
575✔
3527
        for _, pkt := range packets {
1,150✔
3528
                if l.mailBox.HasPacket(pkt.inKey()) {
575✔
UNCOV
3529
                        continue
×
3530
                }
3531

3532
                filteredPkts = append(filteredPkts, pkt)
575✔
3533
        }
3534

3535
        err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...)
575✔
3536
        if err != nil {
586✔
3537
                log.Errorf("Unhandled error while reforwarding htlc "+
11✔
3538
                        "settle/fail over htlcswitch: %v", err)
11✔
3539
        }
11✔
3540
}
3541

3542
// sendHTLCError functions cancels HTLC and send cancel message back to the
3543
// peer from which HTLC was received.
3544
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
3545
        sourceRef channeldb.AddRef, failure *LinkError,
3546
        e hop.ErrorEncrypter, isReceive bool) {
105✔
3547

105✔
3548
        reason, err := e.EncryptFirstHop(failure.WireMessage())
105✔
3549
        if err != nil {
105✔
3550
                l.log.Errorf("unable to obfuscate error: %v", err)
×
3551
                return
×
3552
        }
×
3553

3554
        err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
105✔
3555
        if err != nil {
105✔
3556
                l.log.Errorf("unable cancel htlc: %v", err)
×
3557
                return
×
3558
        }
×
3559

3560
        // Send the appropriate failure message depending on whether we're
3561
        // in a blinded route or not.
3562
        if err := l.sendIncomingHTLCFailureMsg(
105✔
3563
                add.ID, e, reason,
105✔
3564
        ); err != nil {
105✔
3565
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
3566
                return
×
3567
        }
×
3568

3569
        // Notify a link failure on our incoming link. Outgoing htlc information
3570
        // is not available at this point, because we have not decrypted the
3571
        // onion, so it is excluded.
3572
        var eventType HtlcEventType
105✔
3573
        if isReceive {
210✔
3574
                eventType = HtlcEventTypeReceive
105✔
3575
        } else {
105✔
UNCOV
3576
                eventType = HtlcEventTypeForward
×
UNCOV
3577
        }
×
3578

3579
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
105✔
3580
                HtlcKey{
105✔
3581
                        IncomingCircuit: models.CircuitKey{
105✔
3582
                                ChanID: l.ShortChanID(),
105✔
3583
                                HtlcID: add.ID,
105✔
3584
                        },
105✔
3585
                },
105✔
3586
                HtlcInfo{
105✔
3587
                        IncomingTimeLock: add.Expiry,
105✔
3588
                        IncomingAmt:      add.Amount,
105✔
3589
                },
105✔
3590
                eventType,
105✔
3591
                failure,
105✔
3592
                true,
105✔
3593
        )
105✔
3594
}
3595

3596
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
3597
// peer from which the HTLC was received. This function is primarily used to
3598
// handle the special requirements of route blinding, specifically:
3599
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
3600
// - Introduction nodes should return regular HTLC failure messages.
3601
//
3602
// It accepts the original opaque failure, which will be used in the case
3603
// that we're not part of a blinded route and an error encrypter that'll be
3604
// used if we are the introduction node and need to present an error as if
3605
// we're the failing party.
3606
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
3607
        e hop.ErrorEncrypter,
3608
        originalFailure lnwire.OpaqueReason) error {
121✔
3609

121✔
3610
        var msg lnwire.Message
121✔
3611
        switch {
121✔
3612
        // Our circuit's error encrypter will be nil if this was a locally
3613
        // initiated payment. We can only hit a blinded error for a locally
3614
        // initiated payment if we allow ourselves to be picked as the
3615
        // introduction node for our own payments and in that case we
3616
        // shouldn't reach this code. To prevent the HTLC getting stuck,
3617
        // we fail it back and log an error.
3618
        // code.
3619
        case e == nil:
×
3620
                msg = &lnwire.UpdateFailHTLC{
×
3621
                        ChanID: l.ChanID(),
×
3622
                        ID:     htlcIndex,
×
3623
                        Reason: originalFailure,
×
3624
                }
×
3625

×
3626
                l.log.Errorf("Unexpected blinded failure when "+
×
3627
                        "we are the sending node, incoming htlc: %v(%v)",
×
3628
                        l.ShortChanID(), htlcIndex)
×
3629

3630
        // For cleartext hops (ie, non-blinded/normal) we don't need any
3631
        // transformation on the error message and can just send the original.
3632
        case !e.Type().IsBlinded():
121✔
3633
                msg = &lnwire.UpdateFailHTLC{
121✔
3634
                        ChanID: l.ChanID(),
121✔
3635
                        ID:     htlcIndex,
121✔
3636
                        Reason: originalFailure,
121✔
3637
                }
121✔
3638

3639
        // When we're the introduction node, we need to convert the error to
3640
        // a UpdateFailHTLC.
UNCOV
3641
        case e.Type() == hop.EncrypterTypeIntroduction:
×
UNCOV
3642
                l.log.Debugf("Introduction blinded node switching out failure "+
×
UNCOV
3643
                        "error: %v", htlcIndex)
×
UNCOV
3644

×
UNCOV
3645
                // The specification does not require that we set the onion
×
UNCOV
3646
                // blob.
×
UNCOV
3647
                failureMsg := lnwire.NewInvalidBlinding(
×
UNCOV
3648
                        fn.None[[lnwire.OnionPacketSize]byte](),
×
UNCOV
3649
                )
×
UNCOV
3650
                reason, err := e.EncryptFirstHop(failureMsg)
×
UNCOV
3651
                if err != nil {
×
3652
                        return err
×
3653
                }
×
3654

UNCOV
3655
                msg = &lnwire.UpdateFailHTLC{
×
UNCOV
3656
                        ChanID: l.ChanID(),
×
UNCOV
3657
                        ID:     htlcIndex,
×
UNCOV
3658
                        Reason: reason,
×
UNCOV
3659
                }
×
3660

3661
        // If we are a relaying node, we need to switch out any error that
3662
        // we've received to a malformed HTLC error.
UNCOV
3663
        case e.Type() == hop.EncrypterTypeRelaying:
×
UNCOV
3664
                l.log.Debugf("Relaying blinded node switching out malformed "+
×
UNCOV
3665
                        "error: %v", htlcIndex)
×
UNCOV
3666

×
UNCOV
3667
                msg = &lnwire.UpdateFailMalformedHTLC{
×
UNCOV
3668
                        ChanID:      l.ChanID(),
×
UNCOV
3669
                        ID:          htlcIndex,
×
UNCOV
3670
                        FailureCode: lnwire.CodeInvalidBlinding,
×
UNCOV
3671
                }
×
3672

3673
        default:
×
3674
                return fmt.Errorf("unexpected encrypter: %d", e)
×
3675
        }
3676

3677
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
121✔
3678
                l.log.Warnf("Send update fail failed: %v", err)
×
3679
        }
×
3680

3681
        return nil
121✔
3682
}
3683

3684
// sendMalformedHTLCError helper function which sends the malformed HTLC update
3685
// to the payment sender.
3686
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
3687
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
3688
        sourceRef *channeldb.AddRef) {
3✔
3689

3✔
3690
        shaOnionBlob := sha256.Sum256(onionBlob[:])
3✔
3691
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
3✔
3692
        if err != nil {
3✔
3693
                l.log.Errorf("unable cancel htlc: %v", err)
×
3694
                return
×
3695
        }
×
3696

3697
        err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
3✔
3698
                ChanID:       l.ChanID(),
3✔
3699
                ID:           htlcIndex,
3✔
3700
                ShaOnionBlob: shaOnionBlob,
3✔
3701
                FailureCode:  code,
3✔
3702
        })
3✔
3703
        if err != nil {
3✔
3704
                l.log.Errorf("failed to send UpdateFailMalformedHTLC: %v", err)
×
3705
        }
×
3706
}
3707

3708
// failf is a function which is used to encapsulate the action necessary for
3709
// properly failing the link. It takes a LinkFailureError, which will be passed
3710
// to the OnChannelFailure closure, in order for it to determine if we should
3711
// force close the channel, and if we should send an error message to the
3712
// remote peer.
3713
func (l *channelLink) failf(linkErr LinkFailureError, format string,
3714
        a ...interface{}) {
13✔
3715

13✔
3716
        reason := fmt.Errorf(format, a...)
13✔
3717

13✔
3718
        // Return if we have already notified about a failure.
13✔
3719
        if l.failed {
13✔
UNCOV
3720
                l.log.Warnf("ignoring link failure (%v), as link already "+
×
UNCOV
3721
                        "failed", reason)
×
UNCOV
3722
                return
×
UNCOV
3723
        }
×
3724

3725
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
13✔
3726

13✔
3727
        // Set failed, such that we won't process any more updates, and notify
13✔
3728
        // the peer about the failure.
13✔
3729
        l.failed = true
13✔
3730
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
13✔
3731
}
3732

3733
// FundingCustomBlob returns the custom funding blob of the channel that this
3734
// link is associated with. The funding blob represents static information about
3735
// the channel that was created at channel funding time.
3736
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
3737
        if l.channel == nil {
×
3738
                return fn.None[tlv.Blob]()
×
3739
        }
×
3740

3741
        if l.channel.State() == nil {
×
3742
                return fn.None[tlv.Blob]()
×
3743
        }
×
3744

3745
        return l.channel.State().CustomBlob
×
3746
}
3747

3748
// CommitmentCustomBlob returns the custom blob of the current local commitment
3749
// of the channel that this link is associated with.
3750
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
3751
        if l.channel == nil {
×
3752
                return fn.None[tlv.Blob]()
×
3753
        }
×
3754

3755
        return l.channel.LocalCommitmentBlob()
×
3756
}
3757

3758
// handleHtlcResolution takes an HTLC resolution and processes it by draining
3759
// the hodlQueue. Once processed, a commit_sig is sent to the remote to update
3760
// their commitment.
3761
func (l *channelLink) handleHtlcResolution(ctx context.Context,
3762
        hodlItem any) error {
55✔
3763

55✔
3764
        htlcResolution, ok := hodlItem.(invoices.HtlcResolution)
55✔
3765
        if !ok {
55✔
3766
                return fmt.Errorf("expect HtlcResolution, got %T", hodlItem)
×
3767
        }
×
3768

3769
        err := l.processHodlQueue(ctx, htlcResolution)
55✔
3770
        // No error, success.
55✔
3771
        if err == nil {
109✔
3772
                return nil
54✔
3773
        }
54✔
3774

3775
        switch {
1✔
3776
        // If the duplicate keystone error was encountered, fail back
3777
        // gracefully.
3778
        case errors.Is(err, ErrDuplicateKeystone):
×
3779
                l.failf(
×
3780
                        LinkFailureError{
×
3781
                                code: ErrCircuitError,
×
3782
                        },
×
3783
                        "process hodl queue: temporary circuit error: %v", err,
×
3784
                )
×
3785

3786
        // Send an Error message to the peer.
3787
        default:
1✔
3788
                l.failf(
1✔
3789
                        LinkFailureError{
1✔
3790
                                code: ErrInternalError,
1✔
3791
                        },
1✔
3792
                        "process hodl queue: unable to update commitment: %v",
1✔
3793
                        err,
1✔
3794
                )
1✔
3795
        }
3796

3797
        return err
1✔
3798
}
3799

3800
// handleQuiescenceReq takes a locally initialized (RPC) quiescence request and
3801
// forwards it to the quiescer for further processing.
3802
func (l *channelLink) handleQuiescenceReq(req StfuReq) error {
1✔
3803
        l.quiescer.InitStfu(req)
1✔
3804

1✔
3805
        if !l.noDanglingUpdates(lntypes.Local) {
1✔
3806
                return nil
×
3807
        }
×
3808

3809
        err := l.quiescer.SendOwedStfu()
1✔
3810
        if err != nil {
1✔
3811
                l.stfuFailf("SendOwedStfu: %s", err.Error())
×
3812
                res := fn.Err[lntypes.ChannelParty](err)
×
3813
                req.Resolve(res)
×
3814
        }
×
3815

3816
        return err
1✔
3817
}
3818

3819
// handleUpdateFee is called whenever the `updateFeeTimer` ticks. It is used to
3820
// decide whether we should send an `update_fee` msg to update the commitment's
3821
// feerate.
3822
func (l *channelLink) handleUpdateFee(ctx context.Context) error {
4✔
3823
        // If we're not the initiator of the channel, we don't control the fees,
4✔
3824
        // so we can ignore this.
4✔
3825
        if !l.channel.IsInitiator() {
4✔
3826
                return nil
×
3827
        }
×
3828

3829
        // If we are the initiator, then we'll sample the current fee rate to
3830
        // get into the chain within 3 blocks.
3831
        netFee, err := l.sampleNetworkFee()
4✔
3832
        if err != nil {
4✔
3833
                return fmt.Errorf("unable to sample network fee: %w", err)
×
3834
        }
×
3835

3836
        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
4✔
3837

4✔
3838
        newCommitFee := l.channel.IdealCommitFeeRate(
4✔
3839
                netFee, minRelayFee,
4✔
3840
                l.cfg.MaxAnchorsCommitFeeRate,
4✔
3841
                l.cfg.MaxFeeAllocation,
4✔
3842
        )
4✔
3843

4✔
3844
        // We determine if we should adjust the commitment fee based on the
4✔
3845
        // current commitment fee, the suggested new commitment fee and the
4✔
3846
        // current minimum relay fee rate.
4✔
3847
        commitFee := l.channel.CommitFeeRate()
4✔
3848
        if !shouldAdjustCommitFee(newCommitFee, commitFee, minRelayFee) {
5✔
3849
                return nil
1✔
3850
        }
1✔
3851

3852
        // If we do, then we'll send a new UpdateFee message to the remote
3853
        // party, to be locked in with a new update.
3854
        err = l.updateChannelFee(ctx, newCommitFee)
3✔
3855
        if err != nil {
3✔
3856
                return fmt.Errorf("unable to update fee rate: %w", err)
×
3857
        }
×
3858

3859
        return nil
3✔
3860
}
3861

3862
// toggleBatchTicker checks whether we need to resume or pause the batch ticker.
3863
// When we have no pending updates, the ticker is paused, otherwise resumed.
3864
func (l *channelLink) toggleBatchTicker() {
4,151✔
3865
        // If the previous event resulted in a non-empty batch, resume the batch
4,151✔
3866
        // ticker so that it can be cleared. Otherwise pause the ticker to
4,151✔
3867
        // prevent waking up the htlcManager while the batch is empty.
4,151✔
3868
        numUpdates := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
4,151✔
3869
        if numUpdates > 0 {
4,649✔
3870
                l.cfg.BatchTicker.Resume()
498✔
3871
                l.log.Tracef("BatchTicker resumed, NumPendingUpdates(Local, "+
498✔
3872
                        "Remote)=%d", numUpdates)
498✔
3873

498✔
3874
                return
498✔
3875
        }
498✔
3876

3877
        l.cfg.BatchTicker.Pause()
3,653✔
3878
        l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" +
3,653✔
3879
                "(Local, Remote)")
3,653✔
3880
}
3881

3882
// resumeLink is called when starting a previous link. It will go through the
3883
// reestablishment protocol and reforwarding packets that are yet resolved.
3884
func (l *channelLink) resumeLink(ctx context.Context) error {
213✔
3885
        // If this isn't the first time that this channel link has been created,
213✔
3886
        // then we'll need to check to see if we need to re-synchronize state
213✔
3887
        // with the remote peer. settledHtlcs is a map of HTLC's that we
213✔
3888
        // re-settled as part of the channel state sync.
213✔
3889
        if l.cfg.SyncStates {
383✔
3890
                err := l.syncChanStates(ctx)
170✔
3891
                if err != nil {
170✔
UNCOV
3892
                        l.handleChanSyncErr(err)
×
UNCOV
3893

×
UNCOV
3894
                        return err
×
UNCOV
3895
                }
×
3896
        }
3897

3898
        // If a shutdown message has previously been sent on this link, then we
3899
        // need to make sure that we have disabled any HTLC adds on the outgoing
3900
        // direction of the link and that we re-resend the same shutdown message
3901
        // that we previously sent.
3902
        //
3903
        // TODO(yy): we should either move this to chanCloser, or move all
3904
        // shutdown handling logic to be managed by the link, but not a mixed of
3905
        // partial management by two subsystems.
3906
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
213✔
UNCOV
3907
                // Immediately disallow any new outgoing HTLCs.
×
UNCOV
3908
                if !l.DisableAdds(Outgoing) {
×
3909
                        l.log.Warnf("Outgoing link adds already disabled")
×
3910
                }
×
3911

3912
                // Re-send the shutdown message the peer. Since syncChanStates
3913
                // would have sent any outstanding CommitSig, it is fine for us
3914
                // to immediately queue the shutdown message now.
UNCOV
3915
                err := l.cfg.Peer.SendMessage(false, &shutdown)
×
UNCOV
3916
                if err != nil {
×
3917
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
3918
                }
×
3919
        })
3920

3921
        // We've successfully reestablished the channel, mark it as such to
3922
        // allow the switch to forward HTLCs in the outbound direction.
3923
        l.markReestablished()
213✔
3924

213✔
3925
        // With the channel states synced, we now reset the mailbox to ensure we
213✔
3926
        // start processing all unacked packets in order. This is done here to
213✔
3927
        // ensure that all acknowledgments that occur during channel
213✔
3928
        // resynchronization have taken affect, causing us only to pull unacked
213✔
3929
        // packets after starting to read from the downstream mailbox.
213✔
3930
        err := l.mailBox.ResetPackets()
213✔
3931
        if err != nil {
213✔
3932
                l.log.Errorf("failed to reset packets: %v", err)
×
3933
        }
×
3934

3935
        // If the channel is pending, there's no need to reforwarding packets.
3936
        if l.ShortChanID() == hop.Source {
213✔
3937
                return nil
×
3938
        }
×
3939

3940
        // After cleaning up any memory pertaining to incoming packets, we now
3941
        // replay our forwarding packages to handle any htlcs that can be
3942
        // processed locally, or need to be forwarded out to the switch. We will
3943
        // only attempt to resolve packages if our short chan id indicates that
3944
        // the channel is not pending, otherwise we should have no htlcs to
3945
        // reforward.
3946
        err = l.resolveFwdPkgs(ctx)
213✔
3947
        switch {
213✔
3948
        // No error was encountered, success.
3949
        case err == nil:
213✔
3950
                // With our link's in-memory state fully reconstructed, spawn a
213✔
3951
                // goroutine to manage the reclamation of disk space occupied by
213✔
3952
                // completed forwarding packages.
213✔
3953
                l.cg.WgAdd(1)
213✔
3954
                go l.fwdPkgGarbager()
213✔
3955

213✔
3956
                return nil
213✔
3957

3958
        // If the duplicate keystone error was encountered, we'll fail without
3959
        // sending an Error message to the peer.
3960
        case errors.Is(err, ErrDuplicateKeystone):
×
3961
                l.failf(LinkFailureError{code: ErrCircuitError},
×
3962
                        "temporary circuit error: %v", err)
×
3963

3964
        // A non-nil error was encountered, send an Error message to
3965
        // the peer.
3966
        default:
×
3967
                l.failf(LinkFailureError{code: ErrInternalError},
×
3968
                        "unable to resolve fwd pkgs: %v", err)
×
3969
        }
3970

3971
        return err
×
3972
}
3973

3974
// processRemoteUpdateAddHTLC takes an `UpdateAddHTLC` msg sent from the remote
3975
// and processes it.
3976
func (l *channelLink) processRemoteUpdateAddHTLC(
3977
        msg *lnwire.UpdateAddHTLC) error {
450✔
3978

450✔
3979
        if l.IsFlushing(Incoming) {
450✔
3980
                // This is forbidden by the protocol specification. The best
×
3981
                // chance we have to deal with this is to drop the connection.
×
3982
                // This should roll back the channel state to the last
×
3983
                // CommitSig. If the remote has already sent a CommitSig we
×
3984
                // haven't received yet, channel state will be re-synchronized
×
3985
                // with a ChannelReestablish message upon reconnection and the
×
3986
                // protocol state that caused us to flush the link will be
×
3987
                // rolled back. In the event that there was some
×
3988
                // non-deterministic behavior in the remote that caused them to
×
3989
                // violate the protocol, we have a decent shot at correcting it
×
3990
                // this way, since reconnecting will put us in the cleanest
×
3991
                // possible state to try again.
×
3992
                //
×
3993
                // In addition to the above, it is possible for us to hit this
×
3994
                // case in situations where we improperly handle message
×
3995
                // ordering due to concurrency choices. An issue has been filed
×
3996
                // to address this here:
×
3997
                // https://github.com/lightningnetwork/lnd/issues/8393
×
3998
                err := errors.New("received add while link is flushing")
×
3999
                l.failf(
×
4000
                        LinkFailureError{
×
4001
                                code:             ErrInvalidUpdate,
×
4002
                                FailureAction:    LinkFailureDisconnect,
×
4003
                                PermanentFailure: false,
×
4004
                                Warning:          true,
×
4005
                        }, err.Error(),
×
4006
                )
×
4007

×
4008
                return err
×
4009
        }
×
4010

4011
        // Disallow htlcs with blinding points set if we haven't enabled the
4012
        // feature. This saves us from having to process the onion at all, but
4013
        // will only catch blinded payments where we are a relaying node (as the
4014
        // blinding point will be in the payload when we're the introduction
4015
        // node).
4016
        if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
450✔
4017
                err := errors.New("blinding point included when route " +
×
4018
                        "blinding is disabled")
×
4019

×
4020
                l.failf(LinkFailureError{code: ErrInvalidUpdate}, err.Error())
×
4021

×
4022
                return err
×
4023
        }
×
4024

4025
        // We have to check the limit here rather than later in the switch
4026
        // because the counterparty can keep sending HTLC's without sending a
4027
        // revoke. This would mean that the switch check would only occur later.
4028
        if l.isOverexposedWithHtlc(msg, true) {
450✔
4029
                err := errors.New("peer sent us an HTLC that exceeded our " +
×
4030
                        "max fee exposure")
×
4031
                l.failf(LinkFailureError{code: ErrInternalError}, err.Error())
×
4032

×
4033
                return err
×
4034
        }
×
4035

4036
        // We just received an add request from an upstream peer, so we add it
4037
        // to our state machine, then add the HTLC to our "settle" list in the
4038
        // event that we know the preimage.
4039
        index, err := l.channel.ReceiveHTLC(msg)
450✔
4040
        if err != nil {
450✔
4041
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4042
                        "unable to handle upstream add HTLC: %v", err)
×
4043

×
4044
                return err
×
4045
        }
×
4046

4047
        l.log.Tracef("receive upstream htlc with payment hash(%x), "+
450✔
4048
                "assigning index: %v", msg.PaymentHash[:], index)
450✔
4049

450✔
4050
        return nil
450✔
4051
}
4052

4053
// processRemoteUpdateFulfillHTLC takes an `UpdateFulfillHTLC` msg sent from the
4054
// remote and processes it.
4055
func (l *channelLink) processRemoteUpdateFulfillHTLC(
4056
        msg *lnwire.UpdateFulfillHTLC) error {
227✔
4057

227✔
4058
        pre := msg.PaymentPreimage
227✔
4059
        idx := msg.ID
227✔
4060

227✔
4061
        // Before we pipeline the settle, we'll check the set of active htlc's
227✔
4062
        // to see if the related UpdateAddHTLC has been fully locked-in.
227✔
4063
        var lockedin bool
227✔
4064
        htlcs := l.channel.ActiveHtlcs()
227✔
4065
        for _, add := range htlcs {
865✔
4066
                // The HTLC will be outgoing and match idx.
638✔
4067
                if !add.Incoming && add.HtlcIndex == idx {
863✔
4068
                        lockedin = true
225✔
4069
                        break
225✔
4070
                }
4071
        }
4072

4073
        if !lockedin {
229✔
4074
                err := errors.New("unable to handle upstream settle")
2✔
4075
                l.failf(LinkFailureError{code: ErrInvalidUpdate}, err.Error())
2✔
4076

2✔
4077
                return err
2✔
4078
        }
2✔
4079

4080
        if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
225✔
UNCOV
4081
                l.failf(
×
UNCOV
4082
                        LinkFailureError{
×
UNCOV
4083
                                code:          ErrInvalidUpdate,
×
UNCOV
4084
                                FailureAction: LinkFailureForceClose,
×
UNCOV
4085
                        },
×
UNCOV
4086
                        "unable to handle upstream settle HTLC: %v", err,
×
UNCOV
4087
                )
×
UNCOV
4088

×
UNCOV
4089
                return err
×
UNCOV
4090
        }
×
4091

4092
        settlePacket := &htlcPacket{
225✔
4093
                outgoingChanID: l.ShortChanID(),
225✔
4094
                outgoingHTLCID: idx,
225✔
4095
                htlc: &lnwire.UpdateFulfillHTLC{
225✔
4096
                        PaymentPreimage: pre,
225✔
4097
                },
225✔
4098
        }
225✔
4099

225✔
4100
        // Add the newly discovered preimage to our growing list of uncommitted
225✔
4101
        // preimage. These will be written to the witness cache just before
225✔
4102
        // accepting the next commitment signature from the remote peer.
225✔
4103
        l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
225✔
4104

225✔
4105
        // Pipeline this settle, send it to the switch.
225✔
4106
        go l.forwardBatch(false, settlePacket)
225✔
4107

225✔
4108
        return nil
225✔
4109
}
4110

4111
// processRemoteUpdateFailMalformedHTLC takes an `UpdateFailMalformedHTLC` msg
4112
// sent from the remote and processes it.
4113
func (l *channelLink) processRemoteUpdateFailMalformedHTLC(
4114
        msg *lnwire.UpdateFailMalformedHTLC) error {
3✔
4115

3✔
4116
        // Convert the failure type encoded within the HTLC fail message to the
3✔
4117
        // proper generic lnwire error code.
3✔
4118
        var failure lnwire.FailureMessage
3✔
4119
        switch msg.FailureCode {
3✔
4120
        case lnwire.CodeInvalidOnionVersion:
1✔
4121
                failure = &lnwire.FailInvalidOnionVersion{
1✔
4122
                        OnionSHA256: msg.ShaOnionBlob,
1✔
4123
                }
1✔
4124
        case lnwire.CodeInvalidOnionHmac:
×
4125
                failure = &lnwire.FailInvalidOnionHmac{
×
4126
                        OnionSHA256: msg.ShaOnionBlob,
×
4127
                }
×
4128

4129
        case lnwire.CodeInvalidOnionKey:
×
4130
                failure = &lnwire.FailInvalidOnionKey{
×
4131
                        OnionSHA256: msg.ShaOnionBlob,
×
4132
                }
×
4133

4134
        // Handle malformed errors that are part of a blinded route. This case
4135
        // is slightly different, because we expect every relaying node in the
4136
        // blinded portion of the route to send malformed errors. If we're also
4137
        // a relaying node, we're likely going to switch this error out anyway
4138
        // for our own malformed error, but we handle the case here for
4139
        // completeness.
UNCOV
4140
        case lnwire.CodeInvalidBlinding:
×
UNCOV
4141
                failure = &lnwire.FailInvalidBlinding{
×
UNCOV
4142
                        OnionSHA256: msg.ShaOnionBlob,
×
UNCOV
4143
                }
×
4144

4145
        default:
2✔
4146
                l.log.Warnf("unexpected failure code received in "+
2✔
4147
                        "UpdateFailMailformedHTLC: %v", msg.FailureCode)
2✔
4148

2✔
4149
                // We don't just pass back the error we received from our
2✔
4150
                // successor. Otherwise we might report a failure that penalizes
2✔
4151
                // us more than needed. If the onion that we forwarded was
2✔
4152
                // correct, the node should have been able to send back its own
2✔
4153
                // failure. The node did not send back its own failure, so we
2✔
4154
                // assume there was a problem with the onion and report that
2✔
4155
                // back. We reuse the invalid onion key failure because there is
2✔
4156
                // no specific error for this case.
2✔
4157
                failure = &lnwire.FailInvalidOnionKey{
2✔
4158
                        OnionSHA256: msg.ShaOnionBlob,
2✔
4159
                }
2✔
4160
        }
4161

4162
        // With the error parsed, we'll convert the into it's opaque form.
4163
        var b bytes.Buffer
3✔
4164
        if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
3✔
4165
                return fmt.Errorf("unable to encode malformed error: %w", err)
×
4166
        }
×
4167

4168
        // If remote side have been unable to parse the onion blob we have sent
4169
        // to it, than we should transform the malformed HTLC message to the
4170
        // usual HTLC fail message.
4171
        err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
3✔
4172
        if err != nil {
3✔
4173
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4174
                        "unable to handle upstream fail HTLC: %v", err)
×
4175

×
4176
                return err
×
4177
        }
×
4178

4179
        return nil
3✔
4180
}
4181

4182
// processRemoteUpdateFailHTLC takes an `UpdateFailHTLC` msg sent from the
4183
// remote and processes it.
4184
func (l *channelLink) processRemoteUpdateFailHTLC(
4185
        msg *lnwire.UpdateFailHTLC) error {
120✔
4186

120✔
4187
        // Verify that the failure reason is at least 256 bytes plus overhead.
120✔
4188
        const minimumFailReasonLength = lnwire.FailureMessageLength + 2 + 2 + 32
120✔
4189

120✔
4190
        if len(msg.Reason) < minimumFailReasonLength {
121✔
4191
                // We've received a reason with a non-compliant length. Older
1✔
4192
                // nodes happily relay back these failures that may originate
1✔
4193
                // from a node further downstream. Therefore we can't just fail
1✔
4194
                // the channel.
1✔
4195
                //
1✔
4196
                // We want to be compliant ourselves, so we also can't pass back
1✔
4197
                // the reason unmodified. And we must make sure that we don't
1✔
4198
                // hit the magic length check of 260 bytes in
1✔
4199
                // processRemoteSettleFails either.
1✔
4200
                //
1✔
4201
                // Because the reason is unreadable for the payer anyway, we
1✔
4202
                // just replace it by a compliant-length series of random bytes.
1✔
4203
                msg.Reason = make([]byte, minimumFailReasonLength)
1✔
4204
                _, err := crand.Read(msg.Reason[:])
1✔
4205
                if err != nil {
1✔
4206
                        return fmt.Errorf("random generation error: %w", err)
×
4207
                }
×
4208
        }
4209

4210
        // Add fail to the update log.
4211
        idx := msg.ID
120✔
4212
        err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
120✔
4213
        if err != nil {
120✔
4214
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4215
                        "unable to handle upstream fail HTLC: %v", err)
×
4216

×
4217
                return err
×
4218
        }
×
4219

4220
        return nil
120✔
4221
}
4222

4223
// processRemoteCommitSig takes a `CommitSig` msg sent from the remote and
4224
// processes it.
4225
func (l *channelLink) processRemoteCommitSig(ctx context.Context,
4226
        msg *lnwire.CommitSig) error {
1,188✔
4227

1,188✔
4228
        // Since we may have learned new preimages for the first time, we'll add
1,188✔
4229
        // them to our preimage cache. By doing this, we ensure any contested
1,188✔
4230
        // contracts watched by any on-chain arbitrators can now sweep this HTLC
1,188✔
4231
        // on-chain. We delay committing the preimages until just before
1,188✔
4232
        // accepting the new remote commitment, as afterwards the peer won't
1,188✔
4233
        // resend the Settle messages on the next channel reestablishment. Doing
1,188✔
4234
        // so allows us to more effectively batch this operation, instead of
1,188✔
4235
        // doing a single write per preimage.
1,188✔
4236
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
1,188✔
4237
        if err != nil {
1,188✔
4238
                l.failf(
×
4239
                        LinkFailureError{code: ErrInternalError},
×
4240
                        "unable to add preimages=%v to cache: %v",
×
4241
                        l.uncommittedPreimages, err,
×
4242
                )
×
4243

×
4244
                return err
×
4245
        }
×
4246

4247
        // Instead of truncating the slice to conserve memory allocations, we
4248
        // simply set the uncommitted preimage slice to nil so that a new one
4249
        // will be initialized if any more witnesses are discovered. We do this
4250
        // because the maximum size that the slice can occupy is 15KB, and we
4251
        // want to ensure we release that memory back to the runtime.
4252
        l.uncommittedPreimages = nil
1,188✔
4253

1,188✔
4254
        // We just received a new updates to our local commitment chain,
1,188✔
4255
        // validate this new commitment, closing the link if invalid.
1,188✔
4256
        auxSigBlob, err := msg.CustomRecords.Serialize()
1,188✔
4257
        if err != nil {
1,188✔
4258
                l.failf(
×
4259
                        LinkFailureError{code: ErrInvalidCommitment},
×
4260
                        "unable to serialize custom records: %v", err,
×
4261
                )
×
4262

×
4263
                return err
×
4264
        }
×
4265
        err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
1,188✔
4266
                CommitSig:  msg.CommitSig,
1,188✔
4267
                HtlcSigs:   msg.HtlcSigs,
1,188✔
4268
                PartialSig: msg.PartialSig,
1,188✔
4269
                AuxSigBlob: auxSigBlob,
1,188✔
4270
        })
1,188✔
4271
        if err != nil {
1,188✔
4272
                // If we were unable to reconstruct their proposed commitment,
×
4273
                // then we'll examine the type of error. If it's an
×
4274
                // InvalidCommitSigError, then we'll send a direct error.
×
4275
                var sendData []byte
×
4276
                switch {
×
4277
                case lnutils.ErrorAs[*lnwallet.InvalidCommitSigError](err):
×
4278
                        sendData = []byte(err.Error())
×
4279
                case lnutils.ErrorAs[*lnwallet.InvalidHtlcSigError](err):
×
4280
                        sendData = []byte(err.Error())
×
4281
                }
4282
                l.failf(
×
4283
                        LinkFailureError{
×
4284
                                code:          ErrInvalidCommitment,
×
4285
                                FailureAction: LinkFailureForceClose,
×
4286
                                SendData:      sendData,
×
4287
                        },
×
4288
                        "ChannelPoint(%v): unable to accept new "+
×
4289
                                "commitment: %v",
×
4290
                        l.channel.ChannelPoint(), err,
×
4291
                )
×
4292

×
4293
                return err
×
4294
        }
4295

4296
        // As we've just accepted a new state, we'll now immediately send the
4297
        // remote peer a revocation for our prior state.
4298
        nextRevocation, currentHtlcs, finalHTLCs, err :=
1,188✔
4299
                l.channel.RevokeCurrentCommitment()
1,188✔
4300
        if err != nil {
1,188✔
4301
                l.log.Errorf("unable to revoke commitment: %v", err)
×
4302

×
4303
                // We need to fail the channel in case revoking our local
×
4304
                // commitment does not succeed. We might have already advanced
×
4305
                // our channel state which would lead us to proceed with an
×
4306
                // unclean state.
×
4307
                //
×
4308
                // NOTE: We do not trigger a force close because this could
×
4309
                // resolve itself in case our db was just busy not accepting new
×
4310
                // transactions.
×
4311
                l.failf(
×
4312
                        LinkFailureError{
×
4313
                                code:          ErrInternalError,
×
4314
                                Warning:       true,
×
4315
                                FailureAction: LinkFailureDisconnect,
×
4316
                        },
×
4317
                        "ChannelPoint(%v): unable to accept new "+
×
4318
                                "commitment: %v",
×
4319
                        l.channel.ChannelPoint(), err,
×
4320
                )
×
4321

×
4322
                return err
×
4323
        }
×
4324

4325
        // As soon as we are ready to send our next revocation, we can invoke
4326
        // the incoming commit hooks.
4327
        l.Lock()
1,188✔
4328
        l.incomingCommitHooks.invoke()
1,188✔
4329
        l.Unlock()
1,188✔
4330

1,188✔
4331
        err = l.cfg.Peer.SendMessage(false, nextRevocation)
1,188✔
4332
        if err != nil {
1,188✔
4333
                l.log.Errorf("failed to send RevokeAndAck: %v", err)
×
4334
        }
×
4335

4336
        // Notify the incoming htlcs of which the resolutions were locked in.
4337
        for id, settled := range finalHTLCs {
1,519✔
4338
                l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
331✔
4339
                        models.CircuitKey{
331✔
4340
                                ChanID: l.ShortChanID(),
331✔
4341
                                HtlcID: id,
331✔
4342
                        },
331✔
4343
                        channeldb.FinalHtlcInfo{
331✔
4344
                                Settled:  settled,
331✔
4345
                                Offchain: true,
331✔
4346
                        },
331✔
4347
                )
331✔
4348
        }
331✔
4349

4350
        // Since we just revoked our commitment, we may have a new set of HTLC's
4351
        // on our commitment, so we'll send them using our function closure
4352
        // NotifyContractUpdate.
4353
        newUpdate := &contractcourt.ContractUpdate{
1,188✔
4354
                HtlcKey: contractcourt.LocalHtlcSet,
1,188✔
4355
                Htlcs:   currentHtlcs,
1,188✔
4356
        }
1,188✔
4357
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,188✔
4358
        if err != nil {
1,188✔
4359
                return fmt.Errorf("unable to notify contract update: %w", err)
×
4360
        }
×
4361

4362
        select {
1,188✔
4363
        case <-l.cg.Done():
×
4364
                return nil
×
4365
        default:
1,188✔
4366
        }
4367

4368
        // If the remote party initiated the state transition, we'll reply with
4369
        // a signature to provide them with their version of the latest
4370
        // commitment. Otherwise, both commitment chains are fully synced from
4371
        // our PoV, then we don't need to reply with a signature as both sides
4372
        // already have a commitment with the latest accepted.
4373
        if l.channel.OweCommitment() {
1,840✔
4374
                if !l.updateCommitTxOrFail(ctx) {
652✔
4375
                        return nil
×
4376
                }
×
4377
        }
4378

4379
        // If we need to send out an Stfu, this would be the time to do so.
4380
        if l.noDanglingUpdates(lntypes.Local) {
2,261✔
4381
                err = l.quiescer.SendOwedStfu()
1,073✔
4382
                if err != nil {
1,073✔
4383
                        l.stfuFailf("sendOwedStfu: %v", err.Error())
×
4384
                }
×
4385
        }
4386

4387
        // Now that we have finished processing the incoming CommitSig and sent
4388
        // out our RevokeAndAck, we invoke the flushHooks if the channel state
4389
        // is clean.
4390
        l.Lock()
1,188✔
4391
        if l.channel.IsChannelClean() {
1,376✔
4392
                l.flushHooks.invoke()
188✔
4393
        }
188✔
4394
        l.Unlock()
1,188✔
4395

1,188✔
4396
        return nil
1,188✔
4397
}
4398

4399
// processRemoteRevokeAndAck takes a `RevokeAndAck` msg sent from the remote and
4400
// processes it.
4401
func (l *channelLink) processRemoteRevokeAndAck(ctx context.Context,
4402
        msg *lnwire.RevokeAndAck) error {
1,176✔
4403

1,176✔
4404
        // We've received a revocation from the remote chain, if valid, this
1,176✔
4405
        // moves the remote chain forward, and expands our revocation window.
1,176✔
4406

1,176✔
4407
        // We now process the message and advance our remote commit chain.
1,176✔
4408
        fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
1,176✔
4409
        if err != nil {
1,176✔
4410
                // TODO(halseth): force close?
×
4411
                l.failf(
×
4412
                        LinkFailureError{
×
4413
                                code:          ErrInvalidRevocation,
×
4414
                                FailureAction: LinkFailureDisconnect,
×
4415
                        },
×
4416
                        "unable to accept revocation: %v", err,
×
4417
                )
×
4418

×
4419
                return err
×
4420
        }
×
4421

4422
        // The remote party now has a new primary commitment, so we'll update
4423
        // the contract court to be aware of this new set (the prior old remote
4424
        // pending).
4425
        newUpdate := &contractcourt.ContractUpdate{
1,176✔
4426
                HtlcKey: contractcourt.RemoteHtlcSet,
1,176✔
4427
                Htlcs:   remoteHTLCs,
1,176✔
4428
        }
1,176✔
4429
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,176✔
4430
        if err != nil {
1,176✔
4431
                return fmt.Errorf("unable to notify contract update: %w", err)
×
4432
        }
×
4433

4434
        select {
1,176✔
4435
        case <-l.cg.Done():
1✔
4436
                return nil
1✔
4437
        default:
1,175✔
4438
        }
4439

4440
        // If we have a tower client for this channel type, we'll create a
4441
        // backup for the current state.
4442
        if l.cfg.TowerClient != nil {
1,175✔
UNCOV
4443
                state := l.channel.State()
×
UNCOV
4444
                chanID := l.ChanID()
×
UNCOV
4445

×
UNCOV
4446
                err = l.cfg.TowerClient.BackupState(
×
UNCOV
4447
                        &chanID, state.RemoteCommitment.CommitHeight-1,
×
UNCOV
4448
                )
×
UNCOV
4449
                if err != nil {
×
4450
                        l.failf(LinkFailureError{
×
4451
                                code: ErrInternalError,
×
4452
                        }, "unable to queue breach backup: %v", err)
×
4453

×
4454
                        return err
×
4455
                }
×
4456
        }
4457

4458
        // If we can send updates then we can process adds in case we are the
4459
        // exit hop and need to send back resolutions, or in case there are
4460
        // validity issues with the packets. Otherwise we defer the action until
4461
        // resume.
4462
        //
4463
        // We are free to process the settles and fails without this check since
4464
        // processing those can't result in further updates to this channel
4465
        // link.
4466
        if l.quiescer.CanSendUpdates() {
2,349✔
4467
                l.processRemoteAdds(fwdPkg)
1,174✔
4468
        } else {
1,175✔
4469
                l.quiescer.OnResume(func() {
1✔
4470
                        l.processRemoteAdds(fwdPkg)
×
4471
                })
×
4472
        }
4473
        l.processRemoteSettleFails(fwdPkg)
1,175✔
4474

1,175✔
4475
        // If the link failed during processing the adds, we must return to
1,175✔
4476
        // ensure we won't attempted to update the state further.
1,175✔
4477
        if l.failed {
1,175✔
4478
                return nil
×
4479
        }
×
4480

4481
        // The revocation window opened up. If there are pending local updates,
4482
        // try to update the commit tx. Pending updates could already have been
4483
        // present because of a previously failed update to the commit tx or
4484
        // freshly added in by processRemoteAdds. Also in case there are no
4485
        // local updates, but there are still remote updates that are not in the
4486
        // remote commit tx yet, send out an update.
4487
        if l.channel.OweCommitment() {
1,485✔
4488
                if !l.updateCommitTxOrFail(ctx) {
316✔
4489
                        return nil
6✔
4490
                }
6✔
4491
        }
4492

4493
        // Now that we have finished processing the RevokeAndAck, we can invoke
4494
        // the flushHooks if the channel state is clean.
4495
        l.Lock()
1,169✔
4496
        if l.channel.IsChannelClean() {
1,328✔
4497
                l.flushHooks.invoke()
159✔
4498
        }
159✔
4499
        l.Unlock()
1,169✔
4500

1,169✔
4501
        return nil
1,169✔
4502
}
4503

4504
// processRemoteUpdateFee takes an `UpdateFee` msg sent from the remote and
4505
// processes it.
4506
func (l *channelLink) processRemoteUpdateFee(msg *lnwire.UpdateFee) error {
3✔
4507
        // Check and see if their proposed fee-rate would make us exceed the fee
3✔
4508
        // threshold.
3✔
4509
        fee := chainfee.SatPerKWeight(msg.FeePerKw)
3✔
4510

3✔
4511
        isDust, err := l.exceedsFeeExposureLimit(fee)
3✔
4512
        if err != nil {
3✔
4513
                // This shouldn't typically happen. If it does, it indicates
×
4514
                // something is wrong with our channel state.
×
4515
                l.log.Errorf("Unable to determine if fee threshold " +
×
4516
                        "exceeded")
×
4517
                l.failf(LinkFailureError{code: ErrInternalError},
×
4518
                        "error calculating fee exposure: %v", err)
×
4519

×
4520
                return err
×
4521
        }
×
4522

4523
        if isDust {
3✔
4524
                // The proposed fee-rate makes us exceed the fee threshold.
×
4525
                l.failf(LinkFailureError{code: ErrInternalError},
×
4526
                        "fee threshold exceeded: %v", err)
×
4527
                return err
×
4528
        }
×
4529

4530
        // We received fee update from peer. If we are the initiator we will
4531
        // fail the channel, if not we will apply the update.
4532
        if err := l.channel.ReceiveUpdateFee(fee); err != nil {
3✔
4533
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4534
                        "error receiving fee update: %v", err)
×
4535
                return err
×
4536
        }
×
4537

4538
        // Update the mailbox's feerate as well.
4539
        l.mailBox.SetFeeRate(fee)
3✔
4540

3✔
4541
        return nil
3✔
4542
}
4543

4544
// processRemoteError takes an `Error` msg sent from the remote and fails the
4545
// channel link.
UNCOV
4546
func (l *channelLink) processRemoteError(msg *lnwire.Error) {
×
UNCOV
4547
        // Error received from remote, MUST fail channel, but should only print
×
UNCOV
4548
        // the contents of the error message if all characters are printable
×
UNCOV
4549
        // ASCII.
×
UNCOV
4550
        l.failf(
×
UNCOV
4551
                // TODO(halseth): we currently don't fail the channel
×
UNCOV
4552
                // permanently, as there are some sync issues with other
×
UNCOV
4553
                // implementations that will lead to them sending an
×
UNCOV
4554
                // error message, but we can recover from on next
×
UNCOV
4555
                // connection. See
×
UNCOV
4556
                // https://github.com/ElementsProject/lightning/issues/4212
×
UNCOV
4557
                LinkFailureError{
×
UNCOV
4558
                        code:             ErrRemoteError,
×
UNCOV
4559
                        PermanentFailure: false,
×
UNCOV
4560
                },
×
UNCOV
4561
                "ChannelPoint(%v): received error from peer: %v",
×
UNCOV
4562
                l.channel.ChannelPoint(), msg.Error(),
×
UNCOV
4563
        )
×
UNCOV
4564
}
×
4565

4566
// processLocalUpdateFulfillHTLC takes an `UpdateFulfillHTLC` from the local and
4567
// processes it.
4568
func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context,
4569
        pkt *htlcPacket, htlc *lnwire.UpdateFulfillHTLC) {
23✔
4570

23✔
4571
        // If hodl.SettleOutgoing mode is active, we exit early to simulate
23✔
4572
        // arbitrary delays between the switch adding the SETTLE to the mailbox,
23✔
4573
        // and the HTLC being added to the commitment state.
23✔
4574
        if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
23✔
4575
                l.log.Warnf(hodl.SettleOutgoing.Warning())
×
4576
                l.mailBox.AckPacket(pkt.inKey())
×
4577

×
4578
                return
×
4579
        }
×
4580

4581
        // An HTLC we forward to the switch has just settled somewhere upstream.
4582
        // Therefore we settle the HTLC within the our local state machine.
4583
        inKey := pkt.inKey()
23✔
4584
        err := l.channel.SettleHTLC(
23✔
4585
                htlc.PaymentPreimage, pkt.incomingHTLCID, pkt.sourceRef,
23✔
4586
                pkt.destRef, &inKey,
23✔
4587
        )
23✔
4588
        if err != nil {
23✔
4589
                l.log.Errorf("unable to settle incoming HTLC for "+
×
4590
                        "circuit-key=%v: %v", inKey, err)
×
4591

×
4592
                // If the HTLC index for Settle response was not known to our
×
4593
                // commitment state, it has already been cleaned up by a prior
×
4594
                // response. We'll thus try to clean up any lingering state to
×
4595
                // ensure we don't continue reforwarding.
×
4596
                if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
×
4597
                        l.cleanupSpuriousResponse(pkt)
×
4598
                }
×
4599

4600
                // Remove the packet from the link's mailbox to ensure it
4601
                // doesn't get replayed after a reconnection.
4602
                l.mailBox.AckPacket(inKey)
×
4603

×
4604
                return
×
4605
        }
4606

4607
        l.log.Debugf("queueing removal of SETTLE closed circuit: %s->%s",
23✔
4608
                pkt.inKey(), pkt.outKey())
23✔
4609

23✔
4610
        l.closedCircuits = append(l.closedCircuits, pkt.inKey())
23✔
4611

23✔
4612
        // With the HTLC settled, we'll need to populate the wire message to
23✔
4613
        // target the specific channel and HTLC to be canceled.
23✔
4614
        htlc.ChanID = l.ChanID()
23✔
4615
        htlc.ID = pkt.incomingHTLCID
23✔
4616

23✔
4617
        // Then we send the HTLC settle message to the connected peer so we can
23✔
4618
        // continue the propagation of the settle message.
23✔
4619
        err = l.cfg.Peer.SendMessage(false, htlc)
23✔
4620
        if err != nil {
23✔
4621
                l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
×
4622
        }
×
4623

4624
        // Send a settle event notification to htlcNotifier.
4625
        l.cfg.HtlcNotifier.NotifySettleEvent(
23✔
4626
                newHtlcKey(pkt), htlc.PaymentPreimage, getEventType(pkt),
23✔
4627
        )
23✔
4628

23✔
4629
        // Immediately update the commitment tx to minimize latency.
23✔
4630
        l.updateCommitTxOrFail(ctx)
23✔
4631
}
4632

4633
// processLocalUpdateFailHTLC takes an `UpdateFailHTLC` from the local and
4634
// processes it.
4635
func (l *channelLink) processLocalUpdateFailHTLC(ctx context.Context,
4636
        pkt *htlcPacket, htlc *lnwire.UpdateFailHTLC) {
18✔
4637

18✔
4638
        // If hodl.FailOutgoing mode is active, we exit early to simulate
18✔
4639
        // arbitrary delays between the switch adding a FAIL to the mailbox, and
18✔
4640
        // the HTLC being added to the commitment state.
18✔
4641
        if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
18✔
4642
                l.log.Warnf(hodl.FailOutgoing.Warning())
×
4643
                l.mailBox.AckPacket(pkt.inKey())
×
4644

×
4645
                return
×
4646
        }
×
4647

4648
        // An HTLC cancellation has been triggered somewhere upstream, we'll
4649
        // remove then HTLC from our local state machine.
4650
        inKey := pkt.inKey()
18✔
4651
        err := l.channel.FailHTLC(
18✔
4652
                pkt.incomingHTLCID, htlc.Reason, pkt.sourceRef, pkt.destRef,
18✔
4653
                &inKey,
18✔
4654
        )
18✔
4655
        if err != nil {
20✔
4656
                l.log.Errorf("unable to cancel incoming HTLC for "+
2✔
4657
                        "circuit-key=%v: %v", inKey, err)
2✔
4658

2✔
4659
                // If the HTLC index for Fail response was not known to our
2✔
4660
                // commitment state, it has already been cleaned up by a prior
2✔
4661
                // response. We'll thus try to clean up any lingering state to
2✔
4662
                // ensure we don't continue reforwarding.
2✔
4663
                if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
4✔
4664
                        l.cleanupSpuriousResponse(pkt)
2✔
4665
                }
2✔
4666

4667
                // Remove the packet from the link's mailbox to ensure it
4668
                // doesn't get replayed after a reconnection.
4669
                l.mailBox.AckPacket(inKey)
2✔
4670

2✔
4671
                return
2✔
4672
        }
4673

4674
        l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
16✔
4675
                pkt.inKey(), pkt.outKey())
16✔
4676

16✔
4677
        l.closedCircuits = append(l.closedCircuits, pkt.inKey())
16✔
4678

16✔
4679
        // With the HTLC removed, we'll need to populate the wire message to
16✔
4680
        // target the specific channel and HTLC to be canceled. The "Reason"
16✔
4681
        // field will have already been set within the switch.
16✔
4682
        htlc.ChanID = l.ChanID()
16✔
4683
        htlc.ID = pkt.incomingHTLCID
16✔
4684

16✔
4685
        // We send the HTLC message to the peer which initially created the
16✔
4686
        // HTLC. If the incoming blinding point is non-nil, we know that we are
16✔
4687
        // a relaying node in a blinded path. Otherwise, we're either an
16✔
4688
        // introduction node or not part of a blinded path at all.
16✔
4689
        err = l.sendIncomingHTLCFailureMsg(htlc.ID, pkt.obfuscator, htlc.Reason)
16✔
4690
        if err != nil {
16✔
4691
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
4692

×
4693
                return
×
4694
        }
×
4695

4696
        // If the packet does not have a link failure set, it failed further
4697
        // down the route so we notify a forwarding failure. Otherwise, we
4698
        // notify a link failure because it failed at our node.
4699
        if pkt.linkFailure != nil {
26✔
4700
                l.cfg.HtlcNotifier.NotifyLinkFailEvent(
10✔
4701
                        newHtlcKey(pkt), newHtlcInfo(pkt), getEventType(pkt),
10✔
4702
                        pkt.linkFailure, false,
10✔
4703
                )
10✔
4704
        } else {
16✔
4705
                l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
6✔
4706
                        newHtlcKey(pkt), getEventType(pkt),
6✔
4707
                )
6✔
4708
        }
6✔
4709

4710
        // Immediately update the commitment tx to minimize latency.
4711
        l.updateCommitTxOrFail(ctx)
16✔
4712
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc